Convert Sparky to Spring-Boot
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / aggregation / sync / AggregationSynchronizer.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25 package org.onap.aai.sparky.aggregation.sync;
26
27 import static java.util.concurrent.CompletableFuture.supplyAsync;
28
29 import java.io.IOException;
30 import java.util.ArrayList;
31 import java.util.Deque;
32 import java.util.EnumSet;
33 import java.util.Iterator;
34 import java.util.Map;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ConcurrentLinkedDeque;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.function.Supplier;
40
41 import org.onap.aai.cl.api.Logger;
42 import org.onap.aai.cl.eelf.LoggerFactory;
43 import org.onap.aai.cl.mdc.MdcContext;
44 import org.onap.aai.restclient.client.OperationResult;
45 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
46 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
47 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
48 import org.onap.aai.sparky.dal.NetworkTransaction;
49 import org.onap.aai.sparky.dal.rest.HttpMethod;
50 import org.onap.aai.sparky.logging.AaiUiMsgs;
51 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
52 import org.onap.aai.sparky.sync.IndexSynchronizer;
53 import org.onap.aai.sparky.sync.SynchronizerConstants;
54 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
55 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
56 import org.onap.aai.sparky.sync.entity.AggregationEntity;
57 import org.onap.aai.sparky.sync.entity.MergableEntity;
58 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
59 import org.onap.aai.sparky.sync.enumeration.OperationState;
60 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
61 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
62 import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
63 import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
64 import org.onap.aai.sparky.sync.task.PerformElasticSearchUpdate;
65 import org.onap.aai.sparky.util.NodeUtils;
66 import org.slf4j.MDC;
67
68 import com.fasterxml.jackson.core.JsonProcessingException;
69 import com.fasterxml.jackson.databind.JsonNode;
70 import com.fasterxml.jackson.databind.ObjectReader;
71 import com.fasterxml.jackson.databind.node.ArrayNode;
72
73 /**
74  * The Class AutosuggestionSynchronizer.
75  */
76 public class AggregationSynchronizer extends AbstractEntitySynchronizer
77     implements IndexSynchronizer {
78
79   /**
80    * The Class RetryAggregationEntitySyncContainer.
81    */
82   private class RetryAggregationEntitySyncContainer {
83     NetworkTransaction txn;
84     AggregationEntity ae;
85
86     /**
87      * Instantiates a new retry aggregation entity sync container.
88      *
89      * @param txn the txn
90      * @param ae the se
91      */
92     public RetryAggregationEntitySyncContainer(NetworkTransaction txn, AggregationEntity ae) {
93       this.txn = txn;
94       this.ae = ae;
95     }
96
97     public NetworkTransaction getNetworkTransaction() {
98       return txn;
99     }
100
101     public AggregationEntity getAggregationEntity() {
102       return ae;
103     }
104   }
105   
106   private static final Logger LOG =
107       LoggerFactory.getInstance().getLogger(AggregationSynchronizer.class);
108   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
109
110   private boolean allWorkEnumerated;
111   private Deque<SelfLinkDescriptor> selflinks;
112   private Deque<RetryAggregationEntitySyncContainer> retryQueue;
113   private Map<String, Integer> retryLimitTracker;
114   protected ExecutorService esPutExecutor;
115   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
116   private boolean syncInProgress;
117   private Map<String, String> contextMap;
118   private String entityType;
119   private ElasticSearchSchemaConfig schemaConfig;
120   private OxmEntityLookup oxmEntityLookup;
121   
122   /**
123    * Instantiates a new entity aggregation synchronizer.
124    *
125    * @param indexName the index name
126    * @throws Exception the exception
127    */
128   public AggregationSynchronizer(String entityType, ElasticSearchSchemaConfig schemaConfig,
129       int numSyncWorkers, int numActiveInventoryWorkers, int numElasticWorkers,
130       NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig,
131       OxmEntityLookup oxmEntityLookup) throws Exception {
132     
133     super(LOG, "AGGES-" + schemaConfig.getIndexName().toUpperCase(), numSyncWorkers,
134         numActiveInventoryWorkers, numElasticWorkers, schemaConfig.getIndexName(),aaiStatConfig, esStatConfig); 
135     
136     this.oxmEntityLookup = oxmEntityLookup;
137     
138     this.schemaConfig = schemaConfig;
139     this.entityType = entityType;
140     this.allWorkEnumerated = false;
141     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
142     this.synchronizerName = "Entity Aggregation Synchronizer";
143     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
144     this.syncInProgress = false;
145     this.allWorkEnumerated = false;
146     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
147     this.retryQueue = new ConcurrentLinkedDeque<RetryAggregationEntitySyncContainer>();
148     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
149     
150     this.esPutExecutor = NodeUtils.createNamedExecutor("AGGES-ES-PUT", 1, LOG);
151     
152     this.aaiEntityStats.intializeEntityCounters(entityType);
153     this.esEntityStats.intializeEntityCounters(entityType);
154         
155     this.contextMap = MDC.getCopyOfContextMap();
156   }
157
158   /**
159    * Collect all the work.
160    *
161    * @return the operation state
162    */
163   private OperationState collectAllTheWork() {
164     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
165     final String entity = this.getEntityType();
166     try {
167
168       aaiWorkOnHand.set(1);
169
170       supplyAsync(new Supplier<Void>() {
171
172         @Override
173         public Void get() {
174           MDC.setContextMap(contextMap);
175           OperationResult typeLinksResult = null;
176           try {
177             typeLinksResult = aaiAdapter.getSelfLinksByEntityType(entity);
178             aaiWorkOnHand.decrementAndGet();
179             processEntityTypeSelfLinks(typeLinksResult);
180           } catch (Exception exc) {
181                   LOG.error(AaiUiMsgs.ERROR_GENERIC, "Processing execption while building working set.  Error:" 
182                        + exc.getMessage());
183           }
184
185           return null;
186         }
187
188       }, aaiExecutor).whenComplete((result, error) -> {
189
190         if (error != null) {
191           LOG.error(AaiUiMsgs.ERROR_GENERIC,
192               "An error occurred getting data from AAI. Error = " + error.getMessage());
193         }
194       });
195
196       while (aaiWorkOnHand.get() != 0) {
197
198         if (LOG.isDebugEnabled()) {
199           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
200         }
201
202         Thread.sleep(1000);
203       }
204
205       aaiWorkOnHand.set(selflinks.size());
206       allWorkEnumerated = true;
207       syncEntityTypes();
208
209       while (!isSyncDone()) {
210         performRetrySync();
211         Thread.sleep(1000);
212       }
213
214       /*
215        * Make sure we don't hang on to retries that failed which could cause issues during future
216        * syncs
217        */
218       retryLimitTracker.clear();
219
220     } catch (Exception exc) {
221       // TODO -> LOG, waht should be logged here?
222     }
223
224     return OperationState.OK;
225   }
226   
227   
228   /**
229    * Perform retry sync.
230    */
231   private void performRetrySync() {
232     while (retryQueue.peek() != null) {
233
234       RetryAggregationEntitySyncContainer rsc = retryQueue.poll();
235       if (rsc != null) {
236
237         AggregationEntity ae = rsc.getAggregationEntity();
238         NetworkTransaction txn = rsc.getNetworkTransaction();
239
240         String link = null;
241         try {
242           /*
243            * In this retry flow the se object has already derived its fields
244            */
245           link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId());
246         } catch (Exception exc) {
247           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
248         }
249
250         if (link != null) {
251           NetworkTransaction retryTransaction = new NetworkTransaction();
252           retryTransaction.setLink(link);
253           retryTransaction.setEntityType(txn.getEntityType());
254           retryTransaction.setDescriptor(txn.getDescriptor());
255           retryTransaction.setOperationType(HttpMethod.GET);
256
257           /*
258            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
259            * called incrementAndGet when queuing the failed PUT!
260            */
261
262           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
263               esExecutor).whenComplete((result, error) -> {
264
265                 esWorkOnHand.decrementAndGet();
266
267                 if (error != null) {
268                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
269                 } else {
270                   updateElasticSearchCounters(result);
271                   performDocumentUpsert(result, ae);
272                 }
273               });
274         }
275
276       }
277     }
278   }
279   
280   /**
281    * Perform document upsert.
282    *
283    * @param esGetTxn the es get txn
284    * @param ae the ae
285    */
286   protected void performDocumentUpsert(NetworkTransaction esGetTxn, AggregationEntity ae) {
287     /**
288      * <p>
289      * <ul>
290      * As part of the response processing we need to do the following:
291      * <li>1. Extract the version (if present), it will be the ETAG when we use the
292      * Search-Abstraction-Service
293      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
294      * tag
295      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
296      * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
297      * </ul>
298      * </p>
299      */
300     String link = null;
301     try {
302       link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId()); 
303     } catch (Exception exc) {
304       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
305       return;
306     }
307
308     String versionNumber = null;
309     boolean wasEntryDiscovered = false;
310     if (esGetTxn.getOperationResult().getResultCode() == 404) {
311       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, ae.getEntityPrimaryKeyValue());
312     } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
313       wasEntryDiscovered = true;
314       try {
315         versionNumber = NodeUtils.extractFieldValueFromObject(
316             NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
317             "_version");
318       } catch (IOException exc) {
319         String message =
320             "Error extracting version number from response, aborting aggregation entity sync of "
321                 + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
322         LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
323         return;
324       }
325     } else {
326       /*
327        * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
328        * return.
329        */
330       LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
331           String.valueOf(esGetTxn.getOperationResult().getResultCode()));
332       return;
333     }
334
335     try {
336       String jsonPayload = null;
337       if (wasEntryDiscovered) {
338         try {
339           ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
340           NodeUtils.extractObjectsByKey(
341               NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
342               "_source", sourceObject);
343
344           if (!sourceObject.isEmpty()) {
345             String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
346             MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
347             ObjectReader updater = mapper.readerForUpdating(me);
348             MergableEntity merged = updater.readValue(ae.getAsJson());
349             jsonPayload = mapper.writeValueAsString(merged);
350           }
351         } catch (IOException exc) {
352           String message =
353               "Error extracting source value from response, aborting aggregation entity sync of "
354                   + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
355           LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
356           return;
357         }
358       } else {
359         jsonPayload = ae.getAsJson();
360       }
361
362       if (wasEntryDiscovered) {
363         if (versionNumber != null && jsonPayload != null) {
364
365           String requestPayload =
366               elasticSearchAdapter.buildBulkImportOperationRequest(schemaConfig.getIndexName(),
367                   schemaConfig.getIndexDocType(), ae.getId(), versionNumber, jsonPayload);
368
369           NetworkTransaction transactionTracker = new NetworkTransaction();
370           transactionTracker.setEntityType(esGetTxn.getEntityType());
371           transactionTracker.setDescriptor(esGetTxn.getDescriptor());
372           transactionTracker.setOperationType(HttpMethod.PUT);
373
374           esWorkOnHand.incrementAndGet();
375           supplyAsync(new PerformElasticSearchUpdate(elasticSearchAdapter.getBulkUrl(),
376               requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor)
377                   .whenComplete((result, error) -> {
378
379                     esWorkOnHand.decrementAndGet();
380
381                     if (error != null) {
382                       String message = "Aggregation entity sync UPDATE PUT error - "
383                           + error.getLocalizedMessage();
384                       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
385                     } else {
386                       updateElasticSearchCounters(result);
387                       processStoreDocumentResult(result, esGetTxn, ae);
388                     }
389                   });
390         }
391
392       } else {
393         if (link != null && jsonPayload != null) {
394
395           NetworkTransaction updateElasticTxn = new NetworkTransaction();
396           updateElasticTxn.setLink(link);
397           updateElasticTxn.setEntityType(esGetTxn.getEntityType());
398           updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
399           updateElasticTxn.setOperationType(HttpMethod.PUT);
400
401           esWorkOnHand.incrementAndGet();
402           supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
403               esPutExecutor).whenComplete((result, error) -> {
404
405                 esWorkOnHand.decrementAndGet();
406
407                 if (error != null) {
408                   String message =
409                       "Aggregation entity sync UPDATE PUT error - " + error.getLocalizedMessage();
410                   LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
411                 } else {
412                   updateElasticSearchCounters(result);
413                   processStoreDocumentResult(result, esGetTxn, ae);
414                 }
415               });
416         }
417       }
418     } catch (Exception exc) {
419       String message = "Exception caught during aggregation entity sync PUT operation. Message - "
420           + exc.getLocalizedMessage();
421       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
422     }
423   }
424   
425   /**
426    * Should allow retry.
427    *
428    * @param id the id
429    * @return true, if successful
430    */
431   private boolean shouldAllowRetry(String id) {
432     boolean isRetryAllowed = true;
433     if (retryLimitTracker.get(id) != null) {
434       Integer currentCount = retryLimitTracker.get(id);
435       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
436         isRetryAllowed = false;
437         String message = "Aggregation entity re-sync limit reached for " + id
438             + ", re-sync will no longer be attempted for this entity";
439         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
440       } else {
441         Integer newCount = new Integer(currentCount.intValue() + 1);
442         retryLimitTracker.put(id, newCount);
443       }
444     } else {
445       Integer firstRetryCount = new Integer(1);
446       retryLimitTracker.put(id, firstRetryCount);
447     }
448
449     return isRetryAllowed;
450   }
451   
452   /**
453    * Process store document result.
454    *
455    * @param esPutResult the es put result
456    * @param esGetResult the es get result
457    * @param ae the ae
458    */
459   private void processStoreDocumentResult(NetworkTransaction esPutResult,
460       NetworkTransaction esGetResult, AggregationEntity ae) {
461
462     OperationResult or = esPutResult.getOperationResult();
463
464     if (!or.wasSuccessful()) {
465       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
466
467         if (shouldAllowRetry(ae.getId())) {
468           esWorkOnHand.incrementAndGet();
469
470           RetryAggregationEntitySyncContainer rsc =
471               new RetryAggregationEntitySyncContainer(esGetResult, ae);
472           retryQueue.push(rsc);
473
474           String message = "Store document failed during aggregation entity synchronization"
475               + " due to version conflict. Entity will be re-synced.";
476           LOG.warn(AaiUiMsgs.ERROR_GENERIC, message);
477         }
478       } else {
479         String message =
480             "Store document failed during aggregation entity synchronization with result code "
481                 + or.getResultCode() + " and result message " + or.getResult();
482         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
483       }
484     }
485   }
486   
487   /**
488    * Sync entity types.
489    */
490   private void syncEntityTypes() {
491
492     while (selflinks.peek() != null) {
493
494       SelfLinkDescriptor linkDescriptor = selflinks.poll();
495       aaiWorkOnHand.decrementAndGet();
496
497       OxmEntityDescriptor descriptor = null;
498
499       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
500
501         descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
502
503         if (descriptor == null) {
504           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
505           // go to next element in iterator
506           continue;
507         }
508
509         NetworkTransaction txn = new NetworkTransaction();
510         txn.setDescriptor(descriptor);
511         txn.setLink(linkDescriptor.getSelfLink());
512         txn.setOperationType(HttpMethod.GET);
513         txn.setEntityType(linkDescriptor.getEntityType());
514
515         aaiWorkOnHand.incrementAndGet();
516
517         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
518             .whenComplete((result, error) -> {
519
520               aaiWorkOnHand.decrementAndGet();
521
522               if (error != null) {
523                 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
524               } else {
525                 if (result == null) {
526                   LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
527                       linkDescriptor.getSelfLink());
528                 } else {
529                   updateActiveInventoryCounters(result);
530                   fetchDocumentForUpsert(result);
531                 }
532               }
533             });
534       }
535
536     }
537
538   }
539   
540   /**
541    * Fetch document for upsert.
542    *
543    * @param txn the txn
544    */
545   private void fetchDocumentForUpsert(NetworkTransaction txn) {
546     // modified
547     if (!txn.getOperationResult().wasSuccessful()) {
548       String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
549       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
550       return;
551     }
552
553     try {
554         final String jsonResult = txn.getOperationResult().getResult();
555         if (jsonResult != null && jsonResult.length() > 0) {
556
557           AggregationEntity ae = new AggregationEntity();
558           ae.setLink(ActiveInventoryAdapter.extractResourcePath(txn.getLink()));
559           populateAggregationEntityDocument(ae, jsonResult, txn.getDescriptor());
560           ae.deriveFields();
561
562           String link = null;
563           try {
564             link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId());
565           } catch (Exception exc) {
566             LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
567           }
568
569           if (link != null) {
570             NetworkTransaction n2 = new NetworkTransaction();
571             n2.setLink(link);
572             n2.setEntityType(txn.getEntityType());
573             n2.setDescriptor(txn.getDescriptor());
574             n2.setOperationType(HttpMethod.GET);
575
576             esWorkOnHand.incrementAndGet();
577
578             supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
579                 .whenComplete((result, error) -> {
580
581                   esWorkOnHand.decrementAndGet();
582
583                   if (error != null) {
584                     LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
585                   } else {
586                     updateElasticSearchCounters(result);
587                     performDocumentUpsert(result, ae);
588                   }
589                 });
590           }
591         }
592
593                 } catch (JsonProcessingException exc) {
594                         LOG.error(AaiUiMsgs.ERROR_GENERIC,
595                                         "There was a JSON processing error fetching the elastic document for upsert.  Error: "
596                                                         + exc.getMessage());
597                 } catch (IOException exc) {
598                         LOG.error(AaiUiMsgs.ERROR_GENERIC,
599                                         "There was an IO error fetching the elastic document for upsert.  Error: " + exc.getMessage());
600                 }
601           }
602   
603   
604   /**
605    * Populate aggregation entity document.
606    *
607    * @param doc the doc
608    * @param result the result
609    * @param resultDescriptor the result descriptor
610    * @throws JsonProcessingException the json processing exception
611    * @throws IOException Signals that an I/O exception has occurred.
612    */
613   protected void populateAggregationEntityDocument(AggregationEntity doc, String result,
614       OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
615     doc.setEntityType(resultDescriptor.getEntityName());
616     JsonNode entityNode = mapper.readTree(result);
617     Map<String, Object> map = mapper.convertValue(entityNode, Map.class);
618     doc.copyAttributeKeyValuePair(map);
619   }
620   
621   /**
622    * Process entity type self links.
623    *
624    * @param operationResult the operation result
625    */
626   private void processEntityTypeSelfLinks(OperationResult operationResult) {
627
628     JsonNode rootNode = null;
629     
630     if ( operationResult == null ) {
631         return;
632     }
633
634     final String jsonResult = operationResult.getResult();
635
636     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
637
638       try {
639         rootNode = mapper.readTree(jsonResult);
640       } catch (IOException exc) {
641         String message =
642             "Could not deserialize JSON (representing operation result) as node tree. " +
643             "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
644         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
645       }
646
647       JsonNode resultData = rootNode.get("result-data");
648       ArrayNode resultDataArrayNode = null;
649
650       if (resultData.isArray()) {
651         resultDataArrayNode = (ArrayNode) resultData;
652
653         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
654         JsonNode element = null;
655
656         while (elementIterator.hasNext()) {
657           element = elementIterator.next();
658
659           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
660           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
661
662           OxmEntityDescriptor descriptor = null;
663
664           if (resourceType != null && resourceLink != null) {
665
666             descriptor = oxmEntityLookup.getEntityDescriptors().get(resourceType);
667
668             if (descriptor == null) {
669               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
670               // go to next element in iterator
671               continue;
672             }
673
674             selflinks.add(new SelfLinkDescriptor(resourceLink, SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
675             
676
677           }
678         }
679       }
680     }
681
682   }
683
684   /*
685    * (non-Javadoc)
686    * 
687    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
688    */
689   @Override
690   public OperationState doSync() {
691     this.syncDurationInMs = -1;
692     syncStartedTimeStampInMs = System.currentTimeMillis();
693     String txnID = NodeUtils.getRandomTxnId();
694     MdcContext.initialize(txnID, "AggregationSynchronizer", "", "Sync", "");
695     
696     return collectAllTheWork();
697   }
698
699   @Override
700   public SynchronizerState getState() {
701
702     if (!isSyncDone()) {
703       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
704     }
705
706     return SynchronizerState.IDLE;
707
708   }
709
710   /*
711    * (non-Javadoc)
712    * 
713    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
714    */
715   @Override
716   public String getStatReport(boolean showFinalReport) {
717     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
718     return getStatReport(syncDurationInMs, showFinalReport);
719   }
720
721   public String getEntityType() {
722     return entityType;
723   }
724
725   public void setEntityType(String entityType) {
726     this.entityType = entityType;
727   }
728
729   /*
730    * (non-Javadoc)
731    * 
732    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
733    */
734   @Override
735   public void shutdown() {
736     this.shutdownExecutors();
737   }
738
739   @Override
740   protected boolean isSyncDone() {
741
742     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
743
744     if (LOG.isDebugEnabled()) {
745       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + ", isSyncDone(), totalWorkOnHand = "
746           + totalWorkOnHand + " all work enumerated = " + allWorkEnumerated);
747     }
748
749     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
750       return false;
751     }
752
753     this.syncInProgress = false;
754
755     return true;
756   }
757
758   /*
759    * (non-Javadoc)
760    * 
761    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
762    */
763   @Override
764   public void clearCache() {
765
766     if (syncInProgress) {
767       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
768           "Autosuggestion Entity Summarizer in progress, request to clear cache ignored");
769       return;
770     }
771
772     super.clearCache();
773     this.resetCounters();
774     if (entityCounters != null) {
775       entityCounters.clear();
776     }
777
778     allWorkEnumerated = false;
779
780   }
781
782 }