1f8be1dfab2504bcb0154cadde496d79afc0f8fb
[aai/sparky-be.git] / src / main / java / org / openecomp / sparky / synchronizer / AggregationSynchronizer.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25
26 package org.openecomp.sparky.synchronizer;
27
28 import static java.util.concurrent.CompletableFuture.supplyAsync;
29
30 import java.io.IOException;
31 import java.net.InetAddress;
32 import java.net.UnknownHostException;
33 import java.sql.Timestamp;
34 import java.text.SimpleDateFormat;
35 import java.util.ArrayList;
36 import java.util.Collection;
37 import java.util.Deque;
38 import java.util.EnumSet;
39 import java.util.HashMap;
40 import java.util.Iterator;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Map.Entry;
44 import java.util.Set;
45 import java.util.concurrent.ConcurrentHashMap;
46 import java.util.concurrent.ConcurrentLinkedDeque;
47 import java.util.concurrent.ExecutorService;
48 import java.util.concurrent.atomic.AtomicInteger;
49 import java.util.function.Supplier;
50
51 import javax.json.Json;
52
53 import org.openecomp.cl.api.Logger;
54 import org.openecomp.cl.eelf.LoggerFactory;
55 import org.openecomp.sparky.config.oxm.OxmEntityDescriptor;
56 import org.openecomp.sparky.dal.NetworkTransaction;
57 import org.openecomp.sparky.dal.aai.config.ActiveInventoryConfig;
58 import org.openecomp.sparky.dal.elasticsearch.config.ElasticSearchConfig;
59 import org.openecomp.sparky.dal.rest.HttpMethod;
60 import org.openecomp.sparky.dal.rest.OperationResult;
61 import org.openecomp.sparky.logging.AaiUiMsgs;
62 import org.openecomp.sparky.synchronizer.config.SynchronizerConfiguration;
63 import org.openecomp.sparky.synchronizer.entity.AggregationEntity;
64 import org.openecomp.sparky.synchronizer.entity.MergableEntity;
65 import org.openecomp.sparky.synchronizer.entity.SelfLinkDescriptor;
66 import org.openecomp.sparky.synchronizer.enumeration.OperationState;
67 import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
68 import org.openecomp.sparky.synchronizer.task.PerformActiveInventoryRetrieval;
69 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchPut;
70 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchRetrieval;
71 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchUpdate;
72 import org.openecomp.sparky.util.NodeUtils;
73 import org.slf4j.MDC;
74
75 import org.openecomp.cl.mdc.MdcContext;
76 import com.fasterxml.jackson.core.JsonProcessingException;
77 import com.fasterxml.jackson.databind.JsonNode;
78 import com.fasterxml.jackson.databind.ObjectReader;
79 import com.fasterxml.jackson.databind.node.ArrayNode;
80
81 /**
82  * The Class AutosuggestionSynchronizer.
83  */
84 public class AggregationSynchronizer extends AbstractEntitySynchronizer
85     implements IndexSynchronizer {
86
87   /**
88    * The Class RetryAggregationEntitySyncContainer.
89    */
90   private class RetryAggregationEntitySyncContainer {
91     NetworkTransaction txn;
92     AggregationEntity ae;
93
94     /**
95      * Instantiates a new retry aggregation entity sync container.
96      *
97      * @param txn the txn
98      * @param ae the se
99      */
100     public RetryAggregationEntitySyncContainer(NetworkTransaction txn, AggregationEntity ae) {
101       this.txn = txn;
102       this.ae = ae;
103     }
104
105     public NetworkTransaction getNetworkTransaction() {
106       return txn;
107     }
108
109     public AggregationEntity getAggregationEntity() {
110       return ae;
111     }
112   }
113   
114   private static final Logger LOG =
115       LoggerFactory.getInstance().getLogger(AggregationSynchronizer.class);
116   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
117
118   private boolean allWorkEnumerated;
119   private Deque<SelfLinkDescriptor> selflinks;
120   private Deque<RetryAggregationEntitySyncContainer> retryQueue;
121   private Map<String, Integer> retryLimitTracker;
122   protected ExecutorService esPutExecutor;
123   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
124   private boolean syncInProgress;
125   private Map<String, String> contextMap;
126   private String entityType;
127
128   /**
129    * Instantiates a new entity aggregation synchronizer.
130    *
131    * @param indexName the index name
132    * @throws Exception the exception
133    */
134   public AggregationSynchronizer(String entityType, String indexName) throws Exception {
135     super(LOG, "AGGES-" + indexName.toUpperCase(), 2, 5, 5, indexName); // multiple Autosuggestion
136                                                                        // Entity Synchronizer will
137                                                                        // run for different indices
138     
139     this.entityType = entityType;
140     this.allWorkEnumerated = false;
141     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
142     this.synchronizerName = "Entity Aggregation Synchronizer";
143     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
144     this.syncInProgress = false;
145     this.allWorkEnumerated = false;
146     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
147     this.retryQueue = new ConcurrentLinkedDeque<RetryAggregationEntitySyncContainer>();
148     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
149     
150     this.esPutExecutor = NodeUtils.createNamedExecutor("AGGES-ES-PUT", 1, LOG);
151     Map<String, OxmEntityDescriptor> descriptor = new HashMap<String, OxmEntityDescriptor>();
152     descriptor.put(entityType, oxmModelLoader.getEntityDescriptors().get(entityType));
153     this.aaiEntityStats.initializeCountersFromOxmEntityDescriptors(
154         descriptor);
155     this.esEntityStats.initializeCountersFromOxmEntityDescriptors(
156         descriptor);
157     this.contextMap = MDC.getCopyOfContextMap();
158   }
159
160   /**
161    * Collect all the work.
162    *
163    * @return the operation state
164    */
165   private OperationState collectAllTheWork() {
166     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
167     final String entity = this.getEntityType();
168     try {
169
170       aaiWorkOnHand.set(1);
171
172       supplyAsync(new Supplier<Void>() {
173
174         @Override
175         public Void get() {
176           MDC.setContextMap(contextMap);
177           OperationResult typeLinksResult = null;
178           try {
179             typeLinksResult = aaiDataProvider.getSelfLinksByEntityType(entity);
180             aaiWorkOnHand.decrementAndGet();
181             processEntityTypeSelfLinks(typeLinksResult);
182           } catch (Exception exc) {
183             // TODO -> LOG, what should be logged here?
184           }
185
186           return null;
187         }
188
189       }, aaiExecutor).whenComplete((result, error) -> {
190
191         if (error != null) {
192           LOG.error(AaiUiMsgs.ERROR_GENERIC,
193               "An error occurred getting data from AAI. Error = " + error.getMessage());
194         }
195       });
196
197       while (aaiWorkOnHand.get() != 0) {
198
199         if (LOG.isDebugEnabled()) {
200           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
201         }
202
203         Thread.sleep(1000);
204       }
205
206       aaiWorkOnHand.set(selflinks.size());
207       allWorkEnumerated = true;
208       syncEntityTypes();
209
210       while (!isSyncDone()) {
211         performRetrySync();
212         Thread.sleep(1000);
213       }
214
215       /*
216        * Make sure we don't hang on to retries that failed which could cause issues during future
217        * syncs
218        */
219       retryLimitTracker.clear();
220
221     } catch (Exception exc) {
222       // TODO -> LOG, waht should be logged here?
223     }
224
225     return OperationState.OK;
226   }
227   
228   
229   /**
230    * Perform retry sync.
231    */
232   private void performRetrySync() {
233     while (retryQueue.peek() != null) {
234
235       RetryAggregationEntitySyncContainer rsc = retryQueue.poll();
236       if (rsc != null) {
237
238         AggregationEntity ae = rsc.getAggregationEntity();
239         NetworkTransaction txn = rsc.getNetworkTransaction();
240
241         String link = null;
242         try {
243           /*
244            * In this retry flow the se object has already derived its fields
245            */
246           link = getElasticFullUrl("/" + ae.getId(), getIndexName());
247         } catch (Exception exc) {
248           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
249         }
250
251         if (link != null) {
252           NetworkTransaction retryTransaction = new NetworkTransaction();
253           retryTransaction.setLink(link);
254           retryTransaction.setEntityType(txn.getEntityType());
255           retryTransaction.setDescriptor(txn.getDescriptor());
256           retryTransaction.setOperationType(HttpMethod.GET);
257
258           /*
259            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
260            * called incrementAndGet when queuing the failed PUT!
261            */
262
263           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, esDataProvider),
264               esExecutor).whenComplete((result, error) -> {
265
266                 esWorkOnHand.decrementAndGet();
267
268                 if (error != null) {
269                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
270                 } else {
271                   updateElasticSearchCounters(result);
272                   performDocumentUpsert(result, ae);
273                 }
274               });
275         }
276
277       }
278     }
279   }
280   
281   /**
282    * Perform document upsert.
283    *
284    * @param esGetTxn the es get txn
285    * @param ae the ae
286    */
287   protected void performDocumentUpsert(NetworkTransaction esGetTxn, AggregationEntity ae) {
288     /**
289      * <p>
290      * <ul>
291      * As part of the response processing we need to do the following:
292      * <li>1. Extract the version (if present), it will be the ETAG when we use the
293      * Search-Abstraction-Service
294      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
295      * tag
296      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
297      * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
298      * </ul>
299      * </p>
300      */
301     String link = null;
302     try {
303       link = getElasticFullUrl("/" + ae.getId(), getIndexName());
304     } catch (Exception exc) {
305       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
306       return;
307     }
308
309     String versionNumber = null;
310     boolean wasEntryDiscovered = false;
311     if (esGetTxn.getOperationResult().getResultCode() == 404) {
312       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, ae.getEntityPrimaryKeyValue());
313     } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
314       wasEntryDiscovered = true;
315       try {
316         versionNumber = NodeUtils.extractFieldValueFromObject(
317             NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
318             "_version");
319       } catch (IOException exc) {
320         String message =
321             "Error extracting version number from response, aborting aggregation entity sync of "
322                 + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
323         LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
324         return;
325       }
326     } else {
327       /*
328        * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
329        * return.
330        */
331       LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
332           String.valueOf(esGetTxn.getOperationResult().getResultCode()));
333       return;
334     }
335
336     try {
337       String jsonPayload = null;
338       if (wasEntryDiscovered) {
339         try {
340           ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
341           NodeUtils.extractObjectsByKey(
342               NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
343               "_source", sourceObject);
344
345           if (!sourceObject.isEmpty()) {
346             String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
347             MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
348             ObjectReader updater = mapper.readerForUpdating(me);
349             MergableEntity merged = updater.readValue(ae.getIndexDocumentJson());
350             jsonPayload = mapper.writeValueAsString(merged);
351           }
352         } catch (IOException exc) {
353           String message =
354               "Error extracting source value from response, aborting aggregation entity sync of "
355                   + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
356           LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
357           return;
358         }
359       } else {
360         jsonPayload = ae.getIndexDocumentJson();
361       }
362
363       if (wasEntryDiscovered) {
364         if (versionNumber != null && jsonPayload != null) {
365
366           String requestPayload = esDataProvider.buildBulkImportOperationRequest(getIndexName(),
367               ElasticSearchConfig.getConfig().getType(), ae.getId(), versionNumber, jsonPayload);
368
369           NetworkTransaction transactionTracker = new NetworkTransaction();
370           transactionTracker.setEntityType(esGetTxn.getEntityType());
371           transactionTracker.setDescriptor(esGetTxn.getDescriptor());
372           transactionTracker.setOperationType(HttpMethod.PUT);
373
374           esWorkOnHand.incrementAndGet();
375           supplyAsync(new PerformElasticSearchUpdate(ElasticSearchConfig.getConfig().getBulkUrl(),
376               requestPayload, esDataProvider, transactionTracker), esPutExecutor)
377                   .whenComplete((result, error) -> {
378
379                     esWorkOnHand.decrementAndGet();
380
381                     if (error != null) {
382                       String message = "Aggregation entity sync UPDATE PUT error - "
383                           + error.getLocalizedMessage();
384                       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
385                     } else {
386                       updateElasticSearchCounters(result);
387                       processStoreDocumentResult(result, esGetTxn, ae);
388                     }
389                   });
390         }
391
392       } else {
393         if (link != null && jsonPayload != null) {
394
395           NetworkTransaction updateElasticTxn = new NetworkTransaction();
396           updateElasticTxn.setLink(link);
397           updateElasticTxn.setEntityType(esGetTxn.getEntityType());
398           updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
399           updateElasticTxn.setOperationType(HttpMethod.PUT);
400
401           esWorkOnHand.incrementAndGet();
402           supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, esDataProvider),
403               esPutExecutor).whenComplete((result, error) -> {
404
405                 esWorkOnHand.decrementAndGet();
406
407                 if (error != null) {
408                   String message =
409                       "Aggregation entity sync UPDATE PUT error - " + error.getLocalizedMessage();
410                   LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
411                 } else {
412                   updateElasticSearchCounters(result);
413                   processStoreDocumentResult(result, esGetTxn, ae);
414                 }
415               });
416         }
417       }
418     } catch (Exception exc) {
419       String message = "Exception caught during aggregation entity sync PUT operation. Message - "
420           + exc.getLocalizedMessage();
421       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
422     }
423   }
424   
425   /**
426    * Should allow retry.
427    *
428    * @param id the id
429    * @return true, if successful
430    */
431   private boolean shouldAllowRetry(String id) {
432     boolean isRetryAllowed = true;
433     if (retryLimitTracker.get(id) != null) {
434       Integer currentCount = retryLimitTracker.get(id);
435       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
436         isRetryAllowed = false;
437         String message = "Aggregation entity re-sync limit reached for " + id
438             + ", re-sync will no longer be attempted for this entity";
439         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
440       } else {
441         Integer newCount = new Integer(currentCount.intValue() + 1);
442         retryLimitTracker.put(id, newCount);
443       }
444     } else {
445       Integer firstRetryCount = new Integer(1);
446       retryLimitTracker.put(id, firstRetryCount);
447     }
448
449     return isRetryAllowed;
450   }
451   
452   /**
453    * Process store document result.
454    *
455    * @param esPutResult the es put result
456    * @param esGetResult the es get result
457    * @param ae the ae
458    */
459   private void processStoreDocumentResult(NetworkTransaction esPutResult,
460       NetworkTransaction esGetResult, AggregationEntity ae) {
461
462     OperationResult or = esPutResult.getOperationResult();
463
464     if (!or.wasSuccessful()) {
465       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
466
467         if (shouldAllowRetry(ae.getId())) {
468           esWorkOnHand.incrementAndGet();
469
470           RetryAggregationEntitySyncContainer rsc =
471               new RetryAggregationEntitySyncContainer(esGetResult, ae);
472           retryQueue.push(rsc);
473
474           String message = "Store document failed during aggregation entity synchronization"
475               + " due to version conflict. Entity will be re-synced.";
476           LOG.warn(AaiUiMsgs.ERROR_GENERIC, message);
477         }
478       } else {
479         String message =
480             "Store document failed during aggregation entity synchronization with result code "
481                 + or.getResultCode() + " and result message " + or.getResult();
482         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
483       }
484     }
485   }
486   
487   /**
488    * Sync entity types.
489    */
490   private void syncEntityTypes() {
491
492     while (selflinks.peek() != null) {
493
494       SelfLinkDescriptor linkDescriptor = selflinks.poll();
495       aaiWorkOnHand.decrementAndGet();
496
497       OxmEntityDescriptor descriptor = null;
498
499       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
500
501         descriptor = oxmModelLoader.getEntityDescriptor(linkDescriptor.getEntityType());
502
503         if (descriptor == null) {
504           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
505           // go to next element in iterator
506           continue;
507         }
508
509         NetworkTransaction txn = new NetworkTransaction();
510         txn.setDescriptor(descriptor);
511         txn.setLink(linkDescriptor.getSelfLink());
512         txn.setOperationType(HttpMethod.GET);
513         txn.setEntityType(linkDescriptor.getEntityType());
514
515         aaiWorkOnHand.incrementAndGet();
516
517         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiDataProvider), aaiExecutor)
518             .whenComplete((result, error) -> {
519
520               aaiWorkOnHand.decrementAndGet();
521
522               if (error != null) {
523                 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
524               } else {
525                 if (result == null) {
526                   LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
527                       linkDescriptor.getSelfLink());
528                 } else {
529                   updateActiveInventoryCounters(result);
530                   fetchDocumentForUpsert(result);
531                 }
532               }
533             });
534       }
535
536     }
537
538   }
539   
540   /**
541    * Fetch document for upsert.
542    *
543    * @param txn the txn
544    */
545   private void fetchDocumentForUpsert(NetworkTransaction txn) {
546     // modified
547     if (!txn.getOperationResult().wasSuccessful()) {
548       String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
549       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
550       return;
551     }
552
553     try {
554         final String jsonResult = txn.getOperationResult().getResult();
555         if (jsonResult != null && jsonResult.length() > 0) {
556
557           AggregationEntity ae = new AggregationEntity(oxmModelLoader);
558           ae.setLink(ActiveInventoryConfig.extractResourcePath(txn.getLink()));
559           populateAggregationEntityDocument(ae, jsonResult, txn.getDescriptor());
560           ae.deriveFields();
561
562           String link = null;
563           try {
564             link = getElasticFullUrl("/" + ae.getId(), getIndexName());
565           } catch (Exception exc) {
566             LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
567           }
568
569           if (link != null) {
570             NetworkTransaction n2 = new NetworkTransaction();
571             n2.setLink(link);
572             n2.setEntityType(txn.getEntityType());
573             n2.setDescriptor(txn.getDescriptor());
574             n2.setOperationType(HttpMethod.GET);
575
576             esWorkOnHand.incrementAndGet();
577
578             supplyAsync(new PerformElasticSearchRetrieval(n2, esDataProvider), esExecutor)
579                 .whenComplete((result, error) -> {
580
581                   esWorkOnHand.decrementAndGet();
582
583                   if (error != null) {
584                     LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
585                   } else {
586                     updateElasticSearchCounters(result);
587                     performDocumentUpsert(result, ae);
588                   }
589                 });
590           }
591         }
592
593      } catch (JsonProcessingException exc) {
594       // TODO -> LOG, waht should be logged here?
595     } catch (IOException exc) {
596       // TODO -> LOG, waht should be logged here?
597     }
598   }
599   
600   
601   /**
602    * Populate aggregation entity document.
603    *
604    * @param doc the doc
605    * @param result the result
606    * @param resultDescriptor the result descriptor
607    * @throws JsonProcessingException the json processing exception
608    * @throws IOException Signals that an I/O exception has occurred.
609    */
610   protected void populateAggregationEntityDocument(AggregationEntity doc, String result,
611       OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
612     doc.setEntityType(resultDescriptor.getEntityName());
613     JsonNode entityNode = mapper.readTree(result);
614     Map<String, Object> map = mapper.convertValue(entityNode, Map.class);
615     doc.copyAttributeKeyValuePair(map);
616   }
617   
618   /**
619    * Process entity type self links.
620    *
621    * @param operationResult the operation result
622    */
623   private void processEntityTypeSelfLinks(OperationResult operationResult) {
624
625     JsonNode rootNode = null;
626
627     final String jsonResult = operationResult.getResult();
628
629     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
630
631       try {
632         rootNode = mapper.readTree(jsonResult);
633       } catch (IOException exc) {
634         String message =
635             "Could not deserialize JSON (representing operation result) as node tree. " +
636             "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
637         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
638       }
639
640       JsonNode resultData = rootNode.get("result-data");
641       ArrayNode resultDataArrayNode = null;
642
643       if (resultData.isArray()) {
644         resultDataArrayNode = (ArrayNode) resultData;
645
646         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
647         JsonNode element = null;
648
649         while (elementIterator.hasNext()) {
650           element = elementIterator.next();
651
652           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
653           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
654
655           OxmEntityDescriptor descriptor = null;
656
657           if (resourceType != null && resourceLink != null) {
658
659             descriptor = oxmModelLoader.getEntityDescriptor(resourceType);
660
661             if (descriptor == null) {
662               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
663               // go to next element in iterator
664               continue;
665             }
666
667             selflinks.add(new SelfLinkDescriptor(resourceLink, SynchronizerConfiguration.NODES_ONLY_MODIFIER, resourceType));
668             
669
670           }
671         }
672       }
673     }
674
675   }
676
677   /*
678    * (non-Javadoc)
679    * 
680    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
681    */
682   @Override
683   public OperationState doSync() {
684         this.syncDurationInMs = -1;
685         syncStartedTimeStampInMs = System.currentTimeMillis();
686     String txnID = NodeUtils.getRandomTxnId();
687     MdcContext.initialize(txnID, "AggregationSynchronizer", "", "Sync", "");
688     
689     return collectAllTheWork();
690   }
691
692   @Override
693   public SynchronizerState getState() {
694
695     if (!isSyncDone()) {
696       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
697     }
698
699     return SynchronizerState.IDLE;
700
701   }
702
703   /*
704    * (non-Javadoc)
705    * 
706    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
707    */
708   @Override
709   public String getStatReport(boolean showFinalReport) {
710         syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
711         return getStatReport(syncDurationInMs, showFinalReport);
712   }
713
714   public String getEntityType() {
715     return entityType;
716   }
717
718   public void setEntityType(String entityType) {
719     this.entityType = entityType;
720   }
721
722   /*
723    * (non-Javadoc)
724    * 
725    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
726    */
727   @Override
728   public void shutdown() {
729     this.shutdownExecutors();
730   }
731
732   @Override
733   protected boolean isSyncDone() {
734
735     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
736
737     if (LOG.isDebugEnabled()) {
738       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + ", isSyncDone(), totalWorkOnHand = "
739           + totalWorkOnHand + " all work enumerated = " + allWorkEnumerated);
740     }
741
742     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
743       return false;
744     }
745
746     this.syncInProgress = false;
747
748     return true;
749   }
750
751   /*
752    * (non-Javadoc)
753    * 
754    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
755    */
756   @Override
757   public void clearCache() {
758
759     if (syncInProgress) {
760       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
761           "Autosuggestion Entity Summarizer in progress, request to clear cache ignored");
762       return;
763     }
764
765     super.clearCache();
766     this.resetCounters();
767     if (entityCounters != null) {
768       entityCounters.clear();
769     }
770
771     allWorkEnumerated = false;
772
773   }
774
775 }