Changing the license and trademark
[aai/sparky-be.git] / src / main / java / org / openecomp / sparky / synchronizer / AggregationSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.openecomp.sparky.synchronizer;
24
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
26
27 import java.io.IOException;
28 import java.net.InetAddress;
29 import java.net.UnknownHostException;
30 import java.sql.Timestamp;
31 import java.text.SimpleDateFormat;
32 import java.util.ArrayList;
33 import java.util.Collection;
34 import java.util.Deque;
35 import java.util.EnumSet;
36 import java.util.HashMap;
37 import java.util.Iterator;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import java.util.Set;
42 import java.util.concurrent.ConcurrentHashMap;
43 import java.util.concurrent.ConcurrentLinkedDeque;
44 import java.util.concurrent.ExecutorService;
45 import java.util.concurrent.atomic.AtomicInteger;
46 import java.util.function.Supplier;
47
48 import javax.json.Json;
49
50 import org.openecomp.cl.api.Logger;
51 import org.openecomp.cl.eelf.LoggerFactory;
52 import org.openecomp.sparky.config.oxm.OxmEntityDescriptor;
53 import org.openecomp.sparky.dal.NetworkTransaction;
54 import org.openecomp.sparky.dal.aai.config.ActiveInventoryConfig;
55 import org.openecomp.sparky.dal.elasticsearch.config.ElasticSearchConfig;
56 import org.openecomp.sparky.dal.rest.HttpMethod;
57 import org.openecomp.sparky.dal.rest.OperationResult;
58 import org.openecomp.sparky.logging.AaiUiMsgs;
59 import org.openecomp.sparky.synchronizer.config.SynchronizerConfiguration;
60 import org.openecomp.sparky.synchronizer.entity.AggregationEntity;
61 import org.openecomp.sparky.synchronizer.entity.MergableEntity;
62 import org.openecomp.sparky.synchronizer.entity.SelfLinkDescriptor;
63 import org.openecomp.sparky.synchronizer.enumeration.OperationState;
64 import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
65 import org.openecomp.sparky.synchronizer.task.PerformActiveInventoryRetrieval;
66 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchPut;
67 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchRetrieval;
68 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchUpdate;
69 import org.openecomp.sparky.util.NodeUtils;
70 import org.slf4j.MDC;
71
72 import org.openecomp.cl.mdc.MdcContext;
73 import com.fasterxml.jackson.core.JsonProcessingException;
74 import com.fasterxml.jackson.databind.JsonNode;
75 import com.fasterxml.jackson.databind.ObjectReader;
76 import com.fasterxml.jackson.databind.node.ArrayNode;
77
78 /**
79  * The Class AutosuggestionSynchronizer.
80  */
81 public class AggregationSynchronizer extends AbstractEntitySynchronizer
82     implements IndexSynchronizer {
83
84   /**
85    * The Class RetryAggregationEntitySyncContainer.
86    */
87   private class RetryAggregationEntitySyncContainer {
88     NetworkTransaction txn;
89     AggregationEntity ae;
90
91     /**
92      * Instantiates a new retry aggregation entity sync container.
93      *
94      * @param txn the txn
95      * @param ae the se
96      */
97     public RetryAggregationEntitySyncContainer(NetworkTransaction txn, AggregationEntity ae) {
98       this.txn = txn;
99       this.ae = ae;
100     }
101
102     public NetworkTransaction getNetworkTransaction() {
103       return txn;
104     }
105
106     public AggregationEntity getAggregationEntity() {
107       return ae;
108     }
109   }
110   
111   private static final Logger LOG =
112       LoggerFactory.getInstance().getLogger(AggregationSynchronizer.class);
113   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
114
115   private boolean allWorkEnumerated;
116   private Deque<SelfLinkDescriptor> selflinks;
117   private Deque<RetryAggregationEntitySyncContainer> retryQueue;
118   private Map<String, Integer> retryLimitTracker;
119   protected ExecutorService esPutExecutor;
120   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
121   private boolean syncInProgress;
122   private Map<String, String> contextMap;
123   private String entityType;
124
125   /**
126    * Instantiates a new entity aggregation synchronizer.
127    *
128    * @param indexName the index name
129    * @throws Exception the exception
130    */
131   public AggregationSynchronizer(String entityType, String indexName) throws Exception {
132     super(LOG, "AGGES-" + indexName.toUpperCase(), 2, 5, 5, indexName); // multiple Autosuggestion
133                                                                        // Entity Synchronizer will
134                                                                        // run for different indices
135     
136     this.entityType = entityType;
137     this.allWorkEnumerated = false;
138     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
139     this.synchronizerName = "Entity Aggregation Synchronizer";
140     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
141     this.syncInProgress = false;
142     this.allWorkEnumerated = false;
143     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
144     this.retryQueue = new ConcurrentLinkedDeque<RetryAggregationEntitySyncContainer>();
145     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
146     
147     this.esPutExecutor = NodeUtils.createNamedExecutor("AGGES-ES-PUT", 1, LOG);
148     Map<String, OxmEntityDescriptor> descriptor = new HashMap<String, OxmEntityDescriptor>();
149     descriptor.put(entityType, oxmModelLoader.getEntityDescriptors().get(entityType));
150     this.aaiEntityStats.initializeCountersFromOxmEntityDescriptors(
151         descriptor);
152     this.esEntityStats.initializeCountersFromOxmEntityDescriptors(
153         descriptor);
154     this.contextMap = MDC.getCopyOfContextMap();
155   }
156
157   /**
158    * Collect all the work.
159    *
160    * @return the operation state
161    */
162   private OperationState collectAllTheWork() {
163     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
164     final String entity = this.getEntityType();
165     try {
166
167       aaiWorkOnHand.set(1);
168
169       supplyAsync(new Supplier<Void>() {
170
171         @Override
172         public Void get() {
173           MDC.setContextMap(contextMap);
174           OperationResult typeLinksResult = null;
175           try {
176             typeLinksResult = aaiDataProvider.getSelfLinksByEntityType(entity);
177             aaiWorkOnHand.decrementAndGet();
178             processEntityTypeSelfLinks(typeLinksResult);
179           } catch (Exception exc) {
180             // TODO -> LOG, what should be logged here?
181           }
182
183           return null;
184         }
185
186       }, aaiExecutor).whenComplete((result, error) -> {
187
188         if (error != null) {
189           LOG.error(AaiUiMsgs.ERROR_GENERIC,
190               "An error occurred getting data from AAI. Error = " + error.getMessage());
191         }
192       });
193
194       while (aaiWorkOnHand.get() != 0) {
195
196         if (LOG.isDebugEnabled()) {
197           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
198         }
199
200         Thread.sleep(1000);
201       }
202
203       aaiWorkOnHand.set(selflinks.size());
204       allWorkEnumerated = true;
205       syncEntityTypes();
206
207       while (!isSyncDone()) {
208         performRetrySync();
209         Thread.sleep(1000);
210       }
211
212       /*
213        * Make sure we don't hang on to retries that failed which could cause issues during future
214        * syncs
215        */
216       retryLimitTracker.clear();
217
218     } catch (Exception exc) {
219       // TODO -> LOG, waht should be logged here?
220     }
221
222     return OperationState.OK;
223   }
224   
225   
226   /**
227    * Perform retry sync.
228    */
229   private void performRetrySync() {
230     while (retryQueue.peek() != null) {
231
232       RetryAggregationEntitySyncContainer rsc = retryQueue.poll();
233       if (rsc != null) {
234
235         AggregationEntity ae = rsc.getAggregationEntity();
236         NetworkTransaction txn = rsc.getNetworkTransaction();
237
238         String link = null;
239         try {
240           /*
241            * In this retry flow the se object has already derived its fields
242            */
243           link = getElasticFullUrl("/" + ae.getId(), getIndexName());
244         } catch (Exception exc) {
245           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
246         }
247
248         if (link != null) {
249           NetworkTransaction retryTransaction = new NetworkTransaction();
250           retryTransaction.setLink(link);
251           retryTransaction.setEntityType(txn.getEntityType());
252           retryTransaction.setDescriptor(txn.getDescriptor());
253           retryTransaction.setOperationType(HttpMethod.GET);
254
255           /*
256            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
257            * called incrementAndGet when queuing the failed PUT!
258            */
259
260           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, esDataProvider),
261               esExecutor).whenComplete((result, error) -> {
262
263                 esWorkOnHand.decrementAndGet();
264
265                 if (error != null) {
266                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
267                 } else {
268                   updateElasticSearchCounters(result);
269                   performDocumentUpsert(result, ae);
270                 }
271               });
272         }
273
274       }
275     }
276   }
277   
278   /**
279    * Perform document upsert.
280    *
281    * @param esGetTxn the es get txn
282    * @param ae the ae
283    */
284   protected void performDocumentUpsert(NetworkTransaction esGetTxn, AggregationEntity ae) {
285     /**
286      * <p>
287      * <ul>
288      * As part of the response processing we need to do the following:
289      * <li>1. Extract the version (if present), it will be the ETAG when we use the
290      * Search-Abstraction-Service
291      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
292      * tag
293      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
294      * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
295      * </ul>
296      * </p>
297      */
298     String link = null;
299     try {
300       link = getElasticFullUrl("/" + ae.getId(), getIndexName());
301     } catch (Exception exc) {
302       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
303       return;
304     }
305
306     String versionNumber = null;
307     boolean wasEntryDiscovered = false;
308     if (esGetTxn.getOperationResult().getResultCode() == 404) {
309       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, ae.getEntityPrimaryKeyValue());
310     } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
311       wasEntryDiscovered = true;
312       try {
313         versionNumber = NodeUtils.extractFieldValueFromObject(
314             NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
315             "_version");
316       } catch (IOException exc) {
317         String message =
318             "Error extracting version number from response, aborting aggregation entity sync of "
319                 + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
320         LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
321         return;
322       }
323     } else {
324       /*
325        * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
326        * return.
327        */
328       LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
329           String.valueOf(esGetTxn.getOperationResult().getResultCode()));
330       return;
331     }
332
333     try {
334       String jsonPayload = null;
335       if (wasEntryDiscovered) {
336         try {
337           ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
338           NodeUtils.extractObjectsByKey(
339               NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
340               "_source", sourceObject);
341
342           if (!sourceObject.isEmpty()) {
343             String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
344             MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
345             ObjectReader updater = mapper.readerForUpdating(me);
346             MergableEntity merged = updater.readValue(ae.getIndexDocumentJson());
347             jsonPayload = mapper.writeValueAsString(merged);
348           }
349         } catch (IOException exc) {
350           String message =
351               "Error extracting source value from response, aborting aggregation entity sync of "
352                   + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
353           LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
354           return;
355         }
356       } else {
357         jsonPayload = ae.getIndexDocumentJson();
358       }
359
360       if (wasEntryDiscovered) {
361         if (versionNumber != null && jsonPayload != null) {
362
363           String requestPayload = esDataProvider.buildBulkImportOperationRequest(getIndexName(),
364               ElasticSearchConfig.getConfig().getType(), ae.getId(), versionNumber, jsonPayload);
365
366           NetworkTransaction transactionTracker = new NetworkTransaction();
367           transactionTracker.setEntityType(esGetTxn.getEntityType());
368           transactionTracker.setDescriptor(esGetTxn.getDescriptor());
369           transactionTracker.setOperationType(HttpMethod.PUT);
370
371           esWorkOnHand.incrementAndGet();
372           supplyAsync(new PerformElasticSearchUpdate(ElasticSearchConfig.getConfig().getBulkUrl(),
373               requestPayload, esDataProvider, transactionTracker), esPutExecutor)
374                   .whenComplete((result, error) -> {
375
376                     esWorkOnHand.decrementAndGet();
377
378                     if (error != null) {
379                       String message = "Aggregation entity sync UPDATE PUT error - "
380                           + error.getLocalizedMessage();
381                       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
382                     } else {
383                       updateElasticSearchCounters(result);
384                       processStoreDocumentResult(result, esGetTxn, ae);
385                     }
386                   });
387         }
388
389       } else {
390         if (link != null && jsonPayload != null) {
391
392           NetworkTransaction updateElasticTxn = new NetworkTransaction();
393           updateElasticTxn.setLink(link);
394           updateElasticTxn.setEntityType(esGetTxn.getEntityType());
395           updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
396           updateElasticTxn.setOperationType(HttpMethod.PUT);
397
398           esWorkOnHand.incrementAndGet();
399           supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, esDataProvider),
400               esPutExecutor).whenComplete((result, error) -> {
401
402                 esWorkOnHand.decrementAndGet();
403
404                 if (error != null) {
405                   String message =
406                       "Aggregation entity sync UPDATE PUT error - " + error.getLocalizedMessage();
407                   LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
408                 } else {
409                   updateElasticSearchCounters(result);
410                   processStoreDocumentResult(result, esGetTxn, ae);
411                 }
412               });
413         }
414       }
415     } catch (Exception exc) {
416       String message = "Exception caught during aggregation entity sync PUT operation. Message - "
417           + exc.getLocalizedMessage();
418       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
419     }
420   }
421   
422   /**
423    * Should allow retry.
424    *
425    * @param id the id
426    * @return true, if successful
427    */
428   private boolean shouldAllowRetry(String id) {
429     boolean isRetryAllowed = true;
430     if (retryLimitTracker.get(id) != null) {
431       Integer currentCount = retryLimitTracker.get(id);
432       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
433         isRetryAllowed = false;
434         String message = "Aggregation entity re-sync limit reached for " + id
435             + ", re-sync will no longer be attempted for this entity";
436         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
437       } else {
438         Integer newCount = new Integer(currentCount.intValue() + 1);
439         retryLimitTracker.put(id, newCount);
440       }
441     } else {
442       Integer firstRetryCount = new Integer(1);
443       retryLimitTracker.put(id, firstRetryCount);
444     }
445
446     return isRetryAllowed;
447   }
448   
449   /**
450    * Process store document result.
451    *
452    * @param esPutResult the es put result
453    * @param esGetResult the es get result
454    * @param ae the ae
455    */
456   private void processStoreDocumentResult(NetworkTransaction esPutResult,
457       NetworkTransaction esGetResult, AggregationEntity ae) {
458
459     OperationResult or = esPutResult.getOperationResult();
460
461     if (!or.wasSuccessful()) {
462       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
463
464         if (shouldAllowRetry(ae.getId())) {
465           esWorkOnHand.incrementAndGet();
466
467           RetryAggregationEntitySyncContainer rsc =
468               new RetryAggregationEntitySyncContainer(esGetResult, ae);
469           retryQueue.push(rsc);
470
471           String message = "Store document failed during aggregation entity synchronization"
472               + " due to version conflict. Entity will be re-synced.";
473           LOG.warn(AaiUiMsgs.ERROR_GENERIC, message);
474         }
475       } else {
476         String message =
477             "Store document failed during aggregation entity synchronization with result code "
478                 + or.getResultCode() + " and result message " + or.getResult();
479         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
480       }
481     }
482   }
483   
484   /**
485    * Sync entity types.
486    */
487   private void syncEntityTypes() {
488
489     while (selflinks.peek() != null) {
490
491       SelfLinkDescriptor linkDescriptor = selflinks.poll();
492       aaiWorkOnHand.decrementAndGet();
493
494       OxmEntityDescriptor descriptor = null;
495
496       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
497
498         descriptor = oxmModelLoader.getEntityDescriptor(linkDescriptor.getEntityType());
499
500         if (descriptor == null) {
501           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
502           // go to next element in iterator
503           continue;
504         }
505
506         NetworkTransaction txn = new NetworkTransaction();
507         txn.setDescriptor(descriptor);
508         txn.setLink(linkDescriptor.getSelfLink());
509         txn.setOperationType(HttpMethod.GET);
510         txn.setEntityType(linkDescriptor.getEntityType());
511
512         aaiWorkOnHand.incrementAndGet();
513
514         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiDataProvider), aaiExecutor)
515             .whenComplete((result, error) -> {
516
517               aaiWorkOnHand.decrementAndGet();
518
519               if (error != null) {
520                 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
521               } else {
522                 if (result == null) {
523                   LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
524                       linkDescriptor.getSelfLink());
525                 } else {
526                   updateActiveInventoryCounters(result);
527                   fetchDocumentForUpsert(result);
528                 }
529               }
530             });
531       }
532
533     }
534
535   }
536   
537   /**
538    * Fetch document for upsert.
539    *
540    * @param txn the txn
541    */
542   private void fetchDocumentForUpsert(NetworkTransaction txn) {
543     // modified
544     if (!txn.getOperationResult().wasSuccessful()) {
545       String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
546       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
547       return;
548     }
549
550     try {
551         final String jsonResult = txn.getOperationResult().getResult();
552         if (jsonResult != null && jsonResult.length() > 0) {
553
554           AggregationEntity ae = new AggregationEntity(oxmModelLoader);
555           ae.setLink(ActiveInventoryConfig.extractResourcePath(txn.getLink()));
556           populateAggregationEntityDocument(ae, jsonResult, txn.getDescriptor());
557           ae.deriveFields();
558
559           String link = null;
560           try {
561             link = getElasticFullUrl("/" + ae.getId(), getIndexName());
562           } catch (Exception exc) {
563             LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
564           }
565
566           if (link != null) {
567             NetworkTransaction n2 = new NetworkTransaction();
568             n2.setLink(link);
569             n2.setEntityType(txn.getEntityType());
570             n2.setDescriptor(txn.getDescriptor());
571             n2.setOperationType(HttpMethod.GET);
572
573             esWorkOnHand.incrementAndGet();
574
575             supplyAsync(new PerformElasticSearchRetrieval(n2, esDataProvider), esExecutor)
576                 .whenComplete((result, error) -> {
577
578                   esWorkOnHand.decrementAndGet();
579
580                   if (error != null) {
581                     LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
582                   } else {
583                     updateElasticSearchCounters(result);
584                     performDocumentUpsert(result, ae);
585                   }
586                 });
587           }
588         }
589
590      } catch (JsonProcessingException exc) {
591       // TODO -> LOG, waht should be logged here?
592     } catch (IOException exc) {
593       // TODO -> LOG, waht should be logged here?
594     }
595   }
596   
597   
598   /**
599    * Populate aggregation entity document.
600    *
601    * @param doc the doc
602    * @param result the result
603    * @param resultDescriptor the result descriptor
604    * @throws JsonProcessingException the json processing exception
605    * @throws IOException Signals that an I/O exception has occurred.
606    */
607   protected void populateAggregationEntityDocument(AggregationEntity doc, String result,
608       OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
609     doc.setEntityType(resultDescriptor.getEntityName());
610     JsonNode entityNode = mapper.readTree(result);
611     Map<String, Object> map = mapper.convertValue(entityNode, Map.class);
612     doc.copyAttributeKeyValuePair(map);
613   }
614   
615   /**
616    * Process entity type self links.
617    *
618    * @param operationResult the operation result
619    */
620   private void processEntityTypeSelfLinks(OperationResult operationResult) {
621
622     JsonNode rootNode = null;
623
624     final String jsonResult = operationResult.getResult();
625
626     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
627
628       try {
629         rootNode = mapper.readTree(jsonResult);
630       } catch (IOException exc) {
631         String message =
632             "Could not deserialize JSON (representing operation result) as node tree. " +
633             "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
634         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
635       }
636
637       JsonNode resultData = rootNode.get("result-data");
638       ArrayNode resultDataArrayNode = null;
639
640       if (resultData.isArray()) {
641         resultDataArrayNode = (ArrayNode) resultData;
642
643         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
644         JsonNode element = null;
645
646         while (elementIterator.hasNext()) {
647           element = elementIterator.next();
648
649           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
650           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
651
652           OxmEntityDescriptor descriptor = null;
653
654           if (resourceType != null && resourceLink != null) {
655
656             descriptor = oxmModelLoader.getEntityDescriptor(resourceType);
657
658             if (descriptor == null) {
659               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
660               // go to next element in iterator
661               continue;
662             }
663
664             selflinks.add(new SelfLinkDescriptor(resourceLink, SynchronizerConfiguration.NODES_ONLY_MODIFIER, resourceType));
665             
666
667           }
668         }
669       }
670     }
671
672   }
673
674   /*
675    * (non-Javadoc)
676    * 
677    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
678    */
679   @Override
680   public OperationState doSync() {
681         this.syncDurationInMs = -1;
682         syncStartedTimeStampInMs = System.currentTimeMillis();
683     String txnID = NodeUtils.getRandomTxnId();
684     MdcContext.initialize(txnID, "AggregationSynchronizer", "", "Sync", "");
685     
686     return collectAllTheWork();
687   }
688
689   @Override
690   public SynchronizerState getState() {
691
692     if (!isSyncDone()) {
693       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
694     }
695
696     return SynchronizerState.IDLE;
697
698   }
699
700   /*
701    * (non-Javadoc)
702    * 
703    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
704    */
705   @Override
706   public String getStatReport(boolean showFinalReport) {
707         syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
708         return getStatReport(syncDurationInMs, showFinalReport);
709   }
710
711   public String getEntityType() {
712     return entityType;
713   }
714
715   public void setEntityType(String entityType) {
716     this.entityType = entityType;
717   }
718
719   /*
720    * (non-Javadoc)
721    * 
722    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
723    */
724   @Override
725   public void shutdown() {
726     this.shutdownExecutors();
727   }
728
729   @Override
730   protected boolean isSyncDone() {
731
732     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
733
734     if (LOG.isDebugEnabled()) {
735       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + ", isSyncDone(), totalWorkOnHand = "
736           + totalWorkOnHand + " all work enumerated = " + allWorkEnumerated);
737     }
738
739     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
740       return false;
741     }
742
743     this.syncInProgress = false;
744
745     return true;
746   }
747
748   /*
749    * (non-Javadoc)
750    * 
751    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
752    */
753   @Override
754   public void clearCache() {
755
756     if (syncInProgress) {
757       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
758           "Autosuggestion Entity Summarizer in progress, request to clear cache ignored");
759       return;
760     }
761
762     super.clearCache();
763     this.resetCounters();
764     if (entityCounters != null) {
765       entityCounters.clear();
766     }
767
768     allWorkEnumerated = false;
769
770   }
771
772 }