Update the dependencies to use project version
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / synchronizer / AggregationSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.synchronizer;
24
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
26
27 import java.io.IOException;
28 import java.net.InetAddress;
29 import java.net.UnknownHostException;
30 import java.sql.Timestamp;
31 import java.text.SimpleDateFormat;
32 import java.util.ArrayList;
33 import java.util.Collection;
34 import java.util.Deque;
35 import java.util.EnumSet;
36 import java.util.HashMap;
37 import java.util.Iterator;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import java.util.Set;
42 import java.util.concurrent.ConcurrentHashMap;
43 import java.util.concurrent.ConcurrentLinkedDeque;
44 import java.util.concurrent.ExecutorService;
45 import java.util.concurrent.atomic.AtomicInteger;
46 import java.util.function.Supplier;
47
48 import javax.json.Json;
49
50 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
51 import org.onap.aai.sparky.dal.NetworkTransaction;
52 import org.onap.aai.sparky.dal.aai.config.ActiveInventoryConfig;
53 import org.onap.aai.sparky.dal.elasticsearch.config.ElasticSearchConfig;
54 import org.onap.aai.sparky.dal.rest.HttpMethod;
55 import org.onap.aai.sparky.dal.rest.OperationResult;
56 import org.onap.aai.sparky.logging.AaiUiMsgs;
57 import org.onap.aai.sparky.synchronizer.config.SynchronizerConfiguration;
58 import org.onap.aai.sparky.synchronizer.entity.AggregationEntity;
59 import org.onap.aai.sparky.synchronizer.entity.MergableEntity;
60 import org.onap.aai.sparky.synchronizer.entity.SelfLinkDescriptor;
61 import org.onap.aai.sparky.synchronizer.enumeration.OperationState;
62 import org.onap.aai.sparky.synchronizer.enumeration.SynchronizerState;
63 import org.onap.aai.sparky.synchronizer.task.PerformActiveInventoryRetrieval;
64 import org.onap.aai.sparky.synchronizer.task.PerformElasticSearchPut;
65 import org.onap.aai.sparky.synchronizer.task.PerformElasticSearchRetrieval;
66 import org.onap.aai.sparky.synchronizer.task.PerformElasticSearchUpdate;
67 import org.onap.aai.sparky.util.NodeUtils;
68 import org.onap.aai.cl.api.Logger;
69 import org.onap.aai.cl.eelf.LoggerFactory;
70 import org.slf4j.MDC;
71
72 import org.onap.aai.cl.mdc.MdcContext;
73 import com.fasterxml.jackson.core.JsonProcessingException;
74 import com.fasterxml.jackson.databind.JsonNode;
75 import com.fasterxml.jackson.databind.ObjectReader;
76 import com.fasterxml.jackson.databind.node.ArrayNode;
77
78 /**
79  * The Class AutosuggestionSynchronizer.
80  */
81 public class AggregationSynchronizer extends AbstractEntitySynchronizer
82     implements IndexSynchronizer {
83
84   /**
85    * The Class RetryAggregationEntitySyncContainer.
86    */
87   private class RetryAggregationEntitySyncContainer {
88     NetworkTransaction txn;
89     AggregationEntity ae;
90
91     /**
92      * Instantiates a new retry aggregation entity sync container.
93      *
94      * @param txn the txn
95      * @param ae the se
96      */
97     public RetryAggregationEntitySyncContainer(NetworkTransaction txn, AggregationEntity ae) {
98       this.txn = txn;
99       this.ae = ae;
100     }
101
102     public NetworkTransaction getNetworkTransaction() {
103       return txn;
104     }
105
106     public AggregationEntity getAggregationEntity() {
107       return ae;
108     }
109   }
110
111   private static final Logger LOG =
112       LoggerFactory.getInstance().getLogger(AggregationSynchronizer.class);
113   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
114
115   private boolean allWorkEnumerated;
116   private Deque<SelfLinkDescriptor> selflinks;
117   private Deque<RetryAggregationEntitySyncContainer> retryQueue;
118   private Map<String, Integer> retryLimitTracker;
119   protected ExecutorService esPutExecutor;
120   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
121   private boolean syncInProgress;
122   private Map<String, String> contextMap;
123   private String entityType;
124
125   /**
126    * Instantiates a new entity aggregation synchronizer.
127    *
128    * @param indexName the index name
129    * @throws Exception the exception
130    */
131   public AggregationSynchronizer(String entityType, String indexName) throws Exception {
132     super(LOG, "AGGES-" + indexName.toUpperCase(), 2, 5, 5, indexName); // multiple Autosuggestion
133                                                                         // Entity Synchronizer will
134                                                                         // run for different indices
135
136     this.entityType = entityType;
137     this.allWorkEnumerated = false;
138     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
139     this.synchronizerName = "Entity Aggregation Synchronizer";
140     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
141     this.syncInProgress = false;
142     this.allWorkEnumerated = false;
143     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
144     this.retryQueue = new ConcurrentLinkedDeque<RetryAggregationEntitySyncContainer>();
145     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
146
147     this.esPutExecutor = NodeUtils.createNamedExecutor("AGGES-ES-PUT", 1, LOG);
148     Map<String, OxmEntityDescriptor> descriptor = new HashMap<String, OxmEntityDescriptor>();
149     descriptor.put(entityType, oxmModelLoader.getEntityDescriptors().get(entityType));
150     this.aaiEntityStats.initializeCountersFromOxmEntityDescriptors(descriptor);
151     this.esEntityStats.initializeCountersFromOxmEntityDescriptors(descriptor);
152     this.contextMap = MDC.getCopyOfContextMap();
153   }
154
155   /**
156    * Collect all the work.
157    *
158    * @return the operation state
159    */
160   private OperationState collectAllTheWork() {
161     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
162     final String entity = this.getEntityType();
163     try {
164
165       aaiWorkOnHand.set(1);
166
167       supplyAsync(new Supplier<Void>() {
168
169         @Override
170         public Void get() {
171           MDC.setContextMap(contextMap);
172           OperationResult typeLinksResult = null;
173           try {
174             typeLinksResult = aaiDataProvider.getSelfLinksByEntityType(entity);
175             aaiWorkOnHand.decrementAndGet();
176             processEntityTypeSelfLinks(typeLinksResult);
177           } catch (Exception exc) {
178             // TODO -> LOG, what should be logged here?
179           }
180
181           return null;
182         }
183
184       }, aaiExecutor).whenComplete((result, error) -> {
185
186         if (error != null) {
187           LOG.error(AaiUiMsgs.ERROR_GENERIC,
188               "An error occurred getting data from AAI. Error = " + error.getMessage());
189         }
190       });
191
192       while (aaiWorkOnHand.get() != 0) {
193
194         if (LOG.isDebugEnabled()) {
195           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
196         }
197
198         Thread.sleep(1000);
199       }
200
201       aaiWorkOnHand.set(selflinks.size());
202       allWorkEnumerated = true;
203       syncEntityTypes();
204
205       while (!isSyncDone()) {
206         performRetrySync();
207         Thread.sleep(1000);
208       }
209
210       /*
211        * Make sure we don't hang on to retries that failed which could cause issues during future
212        * syncs
213        */
214       retryLimitTracker.clear();
215
216     } catch (Exception exc) {
217       // TODO -> LOG, waht should be logged here?
218     }
219
220     return OperationState.OK;
221   }
222
223
224   /**
225    * Perform retry sync.
226    */
227   private void performRetrySync() {
228     while (retryQueue.peek() != null) {
229
230       RetryAggregationEntitySyncContainer rsc = retryQueue.poll();
231       if (rsc != null) {
232
233         AggregationEntity ae = rsc.getAggregationEntity();
234         NetworkTransaction txn = rsc.getNetworkTransaction();
235
236         String link = null;
237         try {
238           /*
239            * In this retry flow the se object has already derived its fields
240            */
241           link = getElasticFullUrl("/" + ae.getId(), getIndexName());
242         } catch (Exception exc) {
243           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
244         }
245
246         if (link != null) {
247           NetworkTransaction retryTransaction = new NetworkTransaction();
248           retryTransaction.setLink(link);
249           retryTransaction.setEntityType(txn.getEntityType());
250           retryTransaction.setDescriptor(txn.getDescriptor());
251           retryTransaction.setOperationType(HttpMethod.GET);
252
253           /*
254            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
255            * called incrementAndGet when queuing the failed PUT!
256            */
257
258           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, esDataProvider),
259               esExecutor).whenComplete((result, error) -> {
260
261                 esWorkOnHand.decrementAndGet();
262
263                 if (error != null) {
264                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
265                 } else {
266                   updateElasticSearchCounters(result);
267                   performDocumentUpsert(result, ae);
268                 }
269               });
270         }
271
272       }
273     }
274   }
275
276   /**
277    * Perform document upsert.
278    *
279    * @param esGetTxn the es get txn
280    * @param ae the ae
281    */
282   protected void performDocumentUpsert(NetworkTransaction esGetTxn, AggregationEntity ae) {
283     /**
284      * <p>
285      * <ul>
286      * As part of the response processing we need to do the following:
287      * <li>1. Extract the version (if present), it will be the ETAG when we use the
288      * Search-Abstraction-Service
289      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
290      * tag
291      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
292      * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
293      * </ul>
294      * </p>
295      */
296     String link = null;
297     try {
298       link = getElasticFullUrl("/" + ae.getId(), getIndexName());
299     } catch (Exception exc) {
300       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
301       return;
302     }
303
304     String versionNumber = null;
305     boolean wasEntryDiscovered = false;
306     if (esGetTxn.getOperationResult().getResultCode() == 404) {
307       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, ae.getEntityPrimaryKeyValue());
308     } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
309       wasEntryDiscovered = true;
310       try {
311         versionNumber = NodeUtils.extractFieldValueFromObject(
312             NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
313             "_version");
314       } catch (IOException exc) {
315         String message =
316             "Error extracting version number from response, aborting aggregation entity sync of "
317                 + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
318         LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
319         return;
320       }
321     } else {
322       /*
323        * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
324        * return.
325        */
326       LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
327           String.valueOf(esGetTxn.getOperationResult().getResultCode()));
328       return;
329     }
330
331     try {
332       String jsonPayload = null;
333       if (wasEntryDiscovered) {
334         try {
335           ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
336           NodeUtils.extractObjectsByKey(
337               NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
338               "_source", sourceObject);
339
340           if (!sourceObject.isEmpty()) {
341             String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
342             MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
343             ObjectReader updater = mapper.readerForUpdating(me);
344             MergableEntity merged = updater.readValue(ae.getIndexDocumentJson());
345             jsonPayload = mapper.writeValueAsString(merged);
346           }
347         } catch (IOException exc) {
348           String message =
349               "Error extracting source value from response, aborting aggregation entity sync of "
350                   + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
351           LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
352           return;
353         }
354       } else {
355         jsonPayload = ae.getIndexDocumentJson();
356       }
357
358       if (wasEntryDiscovered) {
359         if (versionNumber != null && jsonPayload != null) {
360
361           String requestPayload = esDataProvider.buildBulkImportOperationRequest(getIndexName(),
362               ElasticSearchConfig.getConfig().getType(), ae.getId(), versionNumber, jsonPayload);
363
364           NetworkTransaction transactionTracker = new NetworkTransaction();
365           transactionTracker.setEntityType(esGetTxn.getEntityType());
366           transactionTracker.setDescriptor(esGetTxn.getDescriptor());
367           transactionTracker.setOperationType(HttpMethod.PUT);
368
369           esWorkOnHand.incrementAndGet();
370           supplyAsync(new PerformElasticSearchUpdate(ElasticSearchConfig.getConfig().getBulkUrl(),
371               requestPayload, esDataProvider, transactionTracker), esPutExecutor)
372                   .whenComplete((result, error) -> {
373
374                     esWorkOnHand.decrementAndGet();
375
376                     if (error != null) {
377                       String message = "Aggregation entity sync UPDATE PUT error - "
378                           + error.getLocalizedMessage();
379                       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
380                     } else {
381                       updateElasticSearchCounters(result);
382                       processStoreDocumentResult(result, esGetTxn, ae);
383                     }
384                   });
385         }
386
387       } else {
388         if (link != null && jsonPayload != null) {
389
390           NetworkTransaction updateElasticTxn = new NetworkTransaction();
391           updateElasticTxn.setLink(link);
392           updateElasticTxn.setEntityType(esGetTxn.getEntityType());
393           updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
394           updateElasticTxn.setOperationType(HttpMethod.PUT);
395
396           esWorkOnHand.incrementAndGet();
397           supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, esDataProvider),
398               esPutExecutor).whenComplete((result, error) -> {
399
400                 esWorkOnHand.decrementAndGet();
401
402                 if (error != null) {
403                   String message =
404                       "Aggregation entity sync UPDATE PUT error - " + error.getLocalizedMessage();
405                   LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
406                 } else {
407                   updateElasticSearchCounters(result);
408                   processStoreDocumentResult(result, esGetTxn, ae);
409                 }
410               });
411         }
412       }
413     } catch (Exception exc) {
414       String message = "Exception caught during aggregation entity sync PUT operation. Message - "
415           + exc.getLocalizedMessage();
416       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
417     }
418   }
419
420   /**
421    * Should allow retry.
422    *
423    * @param id the id
424    * @return true, if successful
425    */
426   private boolean shouldAllowRetry(String id) {
427     boolean isRetryAllowed = true;
428     if (retryLimitTracker.get(id) != null) {
429       Integer currentCount = retryLimitTracker.get(id);
430       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
431         isRetryAllowed = false;
432         String message = "Aggregation entity re-sync limit reached for " + id
433             + ", re-sync will no longer be attempted for this entity";
434         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
435       } else {
436         Integer newCount = new Integer(currentCount.intValue() + 1);
437         retryLimitTracker.put(id, newCount);
438       }
439     } else {
440       Integer firstRetryCount = new Integer(1);
441       retryLimitTracker.put(id, firstRetryCount);
442     }
443
444     return isRetryAllowed;
445   }
446
447   /**
448    * Process store document result.
449    *
450    * @param esPutResult the es put result
451    * @param esGetResult the es get result
452    * @param ae the ae
453    */
454   private void processStoreDocumentResult(NetworkTransaction esPutResult,
455       NetworkTransaction esGetResult, AggregationEntity ae) {
456
457     OperationResult or = esPutResult.getOperationResult();
458
459     if (!or.wasSuccessful()) {
460       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
461
462         if (shouldAllowRetry(ae.getId())) {
463           esWorkOnHand.incrementAndGet();
464
465           RetryAggregationEntitySyncContainer rsc =
466               new RetryAggregationEntitySyncContainer(esGetResult, ae);
467           retryQueue.push(rsc);
468
469           String message = "Store document failed during aggregation entity synchronization"
470               + " due to version conflict. Entity will be re-synced.";
471           LOG.warn(AaiUiMsgs.ERROR_GENERIC, message);
472         }
473       } else {
474         String message =
475             "Store document failed during aggregation entity synchronization with result code "
476                 + or.getResultCode() + " and result message " + or.getResult();
477         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
478       }
479     }
480   }
481
482   /**
483    * Sync entity types.
484    */
485   private void syncEntityTypes() {
486
487     while (selflinks.peek() != null) {
488
489       SelfLinkDescriptor linkDescriptor = selflinks.poll();
490       aaiWorkOnHand.decrementAndGet();
491
492       OxmEntityDescriptor descriptor = null;
493
494       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
495
496         descriptor = oxmModelLoader.getEntityDescriptor(linkDescriptor.getEntityType());
497
498         if (descriptor == null) {
499           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
500           // go to next element in iterator
501           continue;
502         }
503
504         NetworkTransaction txn = new NetworkTransaction();
505         txn.setDescriptor(descriptor);
506         txn.setLink(linkDescriptor.getSelfLink());
507         txn.setOperationType(HttpMethod.GET);
508         txn.setEntityType(linkDescriptor.getEntityType());
509
510         aaiWorkOnHand.incrementAndGet();
511
512         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiDataProvider), aaiExecutor)
513             .whenComplete((result, error) -> {
514
515               aaiWorkOnHand.decrementAndGet();
516
517               if (error != null) {
518                 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
519               } else {
520                 if (result == null) {
521                   LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
522                       linkDescriptor.getSelfLink());
523                 } else {
524                   updateActiveInventoryCounters(result);
525                   fetchDocumentForUpsert(result);
526                 }
527               }
528             });
529       }
530
531     }
532
533   }
534
535   /**
536    * Fetch document for upsert.
537    *
538    * @param txn the txn
539    */
540   private void fetchDocumentForUpsert(NetworkTransaction txn) {
541     // modified
542     if (!txn.getOperationResult().wasSuccessful()) {
543       String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
544       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
545       return;
546     }
547
548     try {
549       final String jsonResult = txn.getOperationResult().getResult();
550       if (jsonResult != null && jsonResult.length() > 0) {
551
552         AggregationEntity ae = new AggregationEntity(oxmModelLoader);
553         ae.setLink(ActiveInventoryConfig.extractResourcePath(txn.getLink()));
554         populateAggregationEntityDocument(ae, jsonResult, txn.getDescriptor());
555         ae.deriveFields();
556
557         String link = null;
558         try {
559           link = getElasticFullUrl("/" + ae.getId(), getIndexName());
560         } catch (Exception exc) {
561           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
562         }
563
564         if (link != null) {
565           NetworkTransaction n2 = new NetworkTransaction();
566           n2.setLink(link);
567           n2.setEntityType(txn.getEntityType());
568           n2.setDescriptor(txn.getDescriptor());
569           n2.setOperationType(HttpMethod.GET);
570
571           esWorkOnHand.incrementAndGet();
572
573           supplyAsync(new PerformElasticSearchRetrieval(n2, esDataProvider), esExecutor)
574               .whenComplete((result, error) -> {
575
576                 esWorkOnHand.decrementAndGet();
577
578                 if (error != null) {
579                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
580                 } else {
581                   updateElasticSearchCounters(result);
582                   performDocumentUpsert(result, ae);
583                 }
584               });
585         }
586       }
587
588     } catch (JsonProcessingException exc) {
589       // TODO -> LOG, waht should be logged here?
590     } catch (IOException exc) {
591       // TODO -> LOG, waht should be logged here?
592     }
593   }
594
595
596   /**
597    * Populate aggregation entity document.
598    *
599    * @param doc the doc
600    * @param result the result
601    * @param resultDescriptor the result descriptor
602    * @throws JsonProcessingException the json processing exception
603    * @throws IOException Signals that an I/O exception has occurred.
604    */
605   protected void populateAggregationEntityDocument(AggregationEntity doc, String result,
606       OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
607     doc.setEntityType(resultDescriptor.getEntityName());
608     JsonNode entityNode = mapper.readTree(result);
609     Map<String, Object> map = mapper.convertValue(entityNode, Map.class);
610     doc.copyAttributeKeyValuePair(map);
611   }
612
613   /**
614    * Process entity type self links.
615    *
616    * @param operationResult the operation result
617    */
618   private void processEntityTypeSelfLinks(OperationResult operationResult) {
619
620     JsonNode rootNode = null;
621
622     final String jsonResult = operationResult.getResult();
623
624     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
625
626       try {
627         rootNode = mapper.readTree(jsonResult);
628       } catch (IOException exc) {
629         String message = "Could not deserialize JSON (representing operation result) as node tree. "
630             + "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
631         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
632         return;
633       }
634
635       JsonNode resultData = rootNode.get("result-data");
636       ArrayNode resultDataArrayNode = null;
637
638       if (resultData.isArray()) {
639         resultDataArrayNode = (ArrayNode) resultData;
640
641         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
642         JsonNode element = null;
643
644         while (elementIterator.hasNext()) {
645           element = elementIterator.next();
646
647           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
648           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
649
650           OxmEntityDescriptor descriptor = null;
651
652           if (resourceType != null && resourceLink != null) {
653
654             descriptor = oxmModelLoader.getEntityDescriptor(resourceType);
655
656             if (descriptor == null) {
657               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
658               // go to next element in iterator
659               continue;
660             }
661
662             selflinks.add(new SelfLinkDescriptor(resourceLink,
663                 SynchronizerConfiguration.NODES_ONLY_MODIFIER, resourceType));
664
665
666           }
667         }
668       }
669     }
670
671   }
672
673   /*
674    * (non-Javadoc)
675    * 
676    * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#doSync()
677    */
678   @Override
679   public OperationState doSync() {
680     this.syncDurationInMs = -1;
681     syncStartedTimeStampInMs = System.currentTimeMillis();
682     String txnID = NodeUtils.getRandomTxnId();
683     MdcContext.initialize(txnID, "AggregationSynchronizer", "", "Sync", "");
684
685     return collectAllTheWork();
686   }
687
688   @Override
689   public SynchronizerState getState() {
690
691     if (!isSyncDone()) {
692       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
693     }
694
695     return SynchronizerState.IDLE;
696
697   }
698
699   /*
700    * (non-Javadoc)
701    * 
702    * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
703    */
704   @Override
705   public String getStatReport(boolean showFinalReport) {
706     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
707     return getStatReport(syncDurationInMs, showFinalReport);
708   }
709
710   public String getEntityType() {
711     return entityType;
712   }
713
714   public void setEntityType(String entityType) {
715     this.entityType = entityType;
716   }
717
718   /*
719    * (non-Javadoc)
720    * 
721    * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#shutdown()
722    */
723   @Override
724   public void shutdown() {
725     this.shutdownExecutors();
726   }
727
728   @Override
729   protected boolean isSyncDone() {
730
731     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
732
733     if (LOG.isDebugEnabled()) {
734       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + ", isSyncDone(), totalWorkOnHand = "
735           + totalWorkOnHand + " all work enumerated = " + allWorkEnumerated);
736     }
737
738     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
739       return false;
740     }
741
742     this.syncInProgress = false;
743
744     return true;
745   }
746
747   /*
748    * (non-Javadoc)
749    * 
750    * @see org.onap.aai.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
751    */
752   @Override
753   public void clearCache() {
754
755     if (syncInProgress) {
756       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
757           "Autosuggestion Entity Summarizer in progress, request to clear cache ignored");
758       return;
759     }
760
761     super.clearCache();
762     this.resetCounters();
763     if (entityCounters != null) {
764       entityCounters.clear();
765     }
766
767     allWorkEnumerated = false;
768
769   }
770
771 }