Fix potential null pointer places
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / aggregation / sync / AggregationSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sparky.aggregation.sync;
22
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Deque;
28 import java.util.EnumSet;
29 import java.util.Iterator;
30 import java.util.Map;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.ConcurrentLinkedDeque;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.function.Supplier;
36
37 import org.onap.aai.cl.api.Logger;
38 import org.onap.aai.cl.eelf.LoggerFactory;
39 import org.onap.aai.cl.mdc.MdcContext;
40 import org.onap.aai.restclient.client.OperationResult;
41 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
42 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
43 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
44 import org.onap.aai.sparky.dal.NetworkTransaction;
45 import org.onap.aai.sparky.dal.rest.HttpMethod;
46 import org.onap.aai.sparky.logging.AaiUiMsgs;
47 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
48 import org.onap.aai.sparky.sync.IndexSynchronizer;
49 import org.onap.aai.sparky.sync.SynchronizerConstants;
50 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
51 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
52 import org.onap.aai.sparky.sync.entity.AggregationEntity;
53 import org.onap.aai.sparky.sync.entity.MergableEntity;
54 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
55 import org.onap.aai.sparky.sync.enumeration.OperationState;
56 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
57 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
58 import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
59 import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
60 import org.onap.aai.sparky.sync.task.PerformElasticSearchUpdate;
61 import org.onap.aai.sparky.util.NodeUtils;
62 import org.slf4j.MDC;
63
64 import com.fasterxml.jackson.core.JsonProcessingException;
65 import com.fasterxml.jackson.databind.JsonNode;
66 import com.fasterxml.jackson.databind.ObjectReader;
67 import com.fasterxml.jackson.databind.node.ArrayNode;
68
69 /**
70  * The Class AutosuggestionSynchronizer.
71  */
72 public class AggregationSynchronizer extends AbstractEntitySynchronizer
73     implements IndexSynchronizer {
74
75   /**
76    * The Class RetryAggregationEntitySyncContainer.
77    */
78   private class RetryAggregationEntitySyncContainer {
79     NetworkTransaction txn;
80     AggregationEntity ae;
81
82     /**
83      * Instantiates a new retry aggregation entity sync container.
84      *
85      * @param txn the txn
86      * @param ae the se
87      */
88     public RetryAggregationEntitySyncContainer(NetworkTransaction txn, AggregationEntity ae) {
89       this.txn = txn;
90       this.ae = ae;
91     }
92
93     public NetworkTransaction getNetworkTransaction() {
94       return txn;
95     }
96
97     public AggregationEntity getAggregationEntity() {
98       return ae;
99     }
100   }
101   
102   private static final Logger LOG =
103       LoggerFactory.getInstance().getLogger(AggregationSynchronizer.class);
104   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
105
106   private boolean allWorkEnumerated;
107   private Deque<SelfLinkDescriptor> selflinks;
108   private Deque<RetryAggregationEntitySyncContainer> retryQueue;
109   private Map<String, Integer> retryLimitTracker;
110   protected ExecutorService esPutExecutor;
111   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
112   private boolean syncInProgress;
113   private Map<String, String> contextMap;
114   private String entityType;
115   private ElasticSearchSchemaConfig schemaConfig;
116   private OxmEntityLookup oxmEntityLookup;
117   
118   /**
119    * Instantiates a new entity aggregation synchronizer.
120    *
121    * @throws Exception the exception
122    */
123   public AggregationSynchronizer(String entityType, ElasticSearchSchemaConfig schemaConfig,
124       int numSyncWorkers, int numActiveInventoryWorkers, int numElasticWorkers,
125       NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig,
126       OxmEntityLookup oxmEntityLookup) throws Exception {
127     
128     super(LOG, "AGGES-" + schemaConfig.getIndexName().toUpperCase(), numSyncWorkers,
129         numActiveInventoryWorkers, numElasticWorkers, schemaConfig.getIndexName(),aaiStatConfig, esStatConfig); 
130     
131     this.oxmEntityLookup = oxmEntityLookup;
132     
133     this.schemaConfig = schemaConfig;
134     this.entityType = entityType;
135     this.allWorkEnumerated = false;
136     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
137     this.synchronizerName = "Entity Aggregation Synchronizer";
138     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
139     this.syncInProgress = false;
140     this.allWorkEnumerated = false;
141     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
142     this.retryQueue = new ConcurrentLinkedDeque<RetryAggregationEntitySyncContainer>();
143     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
144     
145     this.esPutExecutor = NodeUtils.createNamedExecutor("AGGES-ES-PUT", 1, LOG);
146     
147     this.aaiEntityStats.intializeEntityCounters(entityType);
148     this.esEntityStats.intializeEntityCounters(entityType);
149         
150     this.contextMap = MDC.getCopyOfContextMap();
151   }
152
153   /**
154    * Collect all the work.
155    *
156    * @return the operation state
157    */
158   private OperationState collectAllTheWork() {
159     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
160     final String entity = this.getEntityType();
161     try {
162
163       aaiWorkOnHand.set(1);
164
165       supplyAsync(new Supplier<Void>() {
166
167         @Override
168         public Void get() {
169           MDC.setContextMap(contextMap);
170           OperationResult typeLinksResult = null;
171           try {
172             typeLinksResult = aaiAdapter.getSelfLinksByEntityType(entity);
173             aaiWorkOnHand.decrementAndGet();
174             processEntityTypeSelfLinks(typeLinksResult);
175           } catch (Exception exc) {
176                   LOG.error(AaiUiMsgs.ERROR_GENERIC, "Processing execption while building working set.  Error:" 
177                        + exc.getMessage());
178           }
179
180           return null;
181         }
182
183       }, aaiExecutor).whenComplete((result, error) -> {
184
185         if (error != null) {
186           LOG.error(AaiUiMsgs.ERROR_GENERIC,
187               "An error occurred getting data from AAI. Error = " + error.getMessage());
188         }
189       });
190
191       while (aaiWorkOnHand.get() != 0) {
192
193         if (LOG.isDebugEnabled()) {
194           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
195         }
196
197         Thread.sleep(1000);
198       }
199
200       aaiWorkOnHand.set(selflinks.size());
201       allWorkEnumerated = true;
202       syncEntityTypes();
203
204       while (!isSyncDone()) {
205         performRetrySync();
206         Thread.sleep(1000);
207       }
208
209       /*
210        * Make sure we don't hang on to retries that failed which could cause issues during future
211        * syncs
212        */
213       retryLimitTracker.clear();
214
215     } catch (Exception exc) {
216       // TODO -> LOG, waht should be logged here?
217     }
218
219     return OperationState.OK;
220   }
221   
222   
223   /**
224    * Perform retry sync.
225    */
226   private void performRetrySync() {
227     while (retryQueue.peek() != null) {
228
229       RetryAggregationEntitySyncContainer rsc = retryQueue.poll();
230       if (rsc != null) {
231
232         AggregationEntity ae = rsc.getAggregationEntity();
233         NetworkTransaction txn = rsc.getNetworkTransaction();
234
235         String link = null;
236         try {
237           /*
238            * In this retry flow the se object has already derived its fields
239            */
240           link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId());
241         } catch (Exception exc) {
242           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
243         }
244
245         if (link != null) {
246           NetworkTransaction retryTransaction = new NetworkTransaction();
247           retryTransaction.setLink(link);
248           retryTransaction.setEntityType(txn.getEntityType());
249           retryTransaction.setDescriptor(txn.getDescriptor());
250           retryTransaction.setOperationType(HttpMethod.GET);
251
252           /*
253            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
254            * called incrementAndGet when queuing the failed PUT!
255            */
256
257           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
258               esExecutor).whenComplete((result, error) -> {
259
260                 esWorkOnHand.decrementAndGet();
261
262                 if (error != null) {
263                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
264                 } else {
265                   updateElasticSearchCounters(result);
266                   performDocumentUpsert(result, ae);
267                 }
268               });
269         }
270
271       }
272     }
273   }
274   
275   /**
276    * Perform document upsert.
277    *
278    * @param esGetTxn the es get txn
279    * @param ae the ae
280    */
281   protected void performDocumentUpsert(NetworkTransaction esGetTxn, AggregationEntity ae) {
282     /**
283      * <p>
284      * <ul>
285      * As part of the response processing we need to do the following:
286      * <li>1. Extract the version (if present), it will be the ETAG when we use the
287      * Search-Abstraction-Service
288      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
289      * tag
290      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
291      * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
292      * </ul>
293      * </p>
294      */
295     String link = null;
296     try {
297       link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId()); 
298     } catch (Exception exc) {
299       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
300       return;
301     }
302
303     String versionNumber = null;
304     boolean wasEntryDiscovered = false;
305     if (esGetTxn.getOperationResult().getResultCode() == 404) {
306       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, ae.getEntityPrimaryKeyValue());
307     } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
308       wasEntryDiscovered = true;
309       try {
310         versionNumber = NodeUtils.extractFieldValueFromObject(
311             NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
312             "_version");
313       } catch (IOException exc) {
314         String message =
315             "Error extracting version number from response, aborting aggregation entity sync of "
316                 + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
317         LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
318         return;
319       }
320     } else {
321       /*
322        * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
323        * return.
324        */
325       LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
326           String.valueOf(esGetTxn.getOperationResult().getResultCode()));
327       return;
328     }
329
330     try {
331       String jsonPayload = null;
332       if (wasEntryDiscovered) {
333         try {
334           ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
335           NodeUtils.extractObjectsByKey(
336               NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
337               "_source", sourceObject);
338
339           if (!sourceObject.isEmpty()) {
340             String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
341             MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
342             ObjectReader updater = mapper.readerForUpdating(me);
343             MergableEntity merged = updater.readValue(ae.getAsJson());
344             jsonPayload = mapper.writeValueAsString(merged);
345           }
346         } catch (IOException exc) {
347           String message =
348               "Error extracting source value from response, aborting aggregation entity sync of "
349                   + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
350           LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
351           return;
352         }
353       } else {
354         jsonPayload = ae.getAsJson();
355       }
356
357       if (wasEntryDiscovered) {
358         if (versionNumber != null && jsonPayload != null) {
359
360           String requestPayload =
361               elasticSearchAdapter.buildBulkImportOperationRequest(schemaConfig.getIndexName(),
362                   schemaConfig.getIndexDocType(), ae.getId(), versionNumber, jsonPayload);
363
364           NetworkTransaction transactionTracker = new NetworkTransaction();
365           transactionTracker.setEntityType(esGetTxn.getEntityType());
366           transactionTracker.setDescriptor(esGetTxn.getDescriptor());
367           transactionTracker.setOperationType(HttpMethod.PUT);
368
369           esWorkOnHand.incrementAndGet();
370           supplyAsync(new PerformElasticSearchUpdate(elasticSearchAdapter.getBulkUrl(),
371               requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor)
372                   .whenComplete((result, error) -> {
373
374                     esWorkOnHand.decrementAndGet();
375
376                     if (error != null) {
377                       String message = "Aggregation entity sync UPDATE PUT error - "
378                           + error.getLocalizedMessage();
379                       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
380                     } else {
381                       updateElasticSearchCounters(result);
382                       processStoreDocumentResult(result, esGetTxn, ae);
383                     }
384                   });
385         }
386
387       } else {
388         if (link != null && jsonPayload != null) {
389
390           NetworkTransaction updateElasticTxn = new NetworkTransaction();
391           updateElasticTxn.setLink(link);
392           updateElasticTxn.setEntityType(esGetTxn.getEntityType());
393           updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
394           updateElasticTxn.setOperationType(HttpMethod.PUT);
395
396           esWorkOnHand.incrementAndGet();
397           supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
398               esPutExecutor).whenComplete((result, error) -> {
399
400                 esWorkOnHand.decrementAndGet();
401
402                 if (error != null) {
403                   String message =
404                       "Aggregation entity sync UPDATE PUT error - " + error.getLocalizedMessage();
405                   LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
406                 } else {
407                   updateElasticSearchCounters(result);
408                   processStoreDocumentResult(result, esGetTxn, ae);
409                 }
410               });
411         }
412       }
413     } catch (Exception exc) {
414       String message = "Exception caught during aggregation entity sync PUT operation. Message - "
415           + exc.getLocalizedMessage();
416       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
417     }
418   }
419   
420   /**
421    * Should allow retry.
422    *
423    * @param id the id
424    * @return true, if successful
425    */
426   private boolean shouldAllowRetry(String id) {
427     boolean isRetryAllowed = true;
428     if (retryLimitTracker.get(id) != null) {
429       Integer currentCount = retryLimitTracker.get(id);
430       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
431         isRetryAllowed = false;
432         String message = "Aggregation entity re-sync limit reached for " + id
433             + ", re-sync will no longer be attempted for this entity";
434         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
435       } else {
436         Integer newCount = new Integer(currentCount.intValue() + 1);
437         retryLimitTracker.put(id, newCount);
438       }
439     } else {
440       Integer firstRetryCount = new Integer(1);
441       retryLimitTracker.put(id, firstRetryCount);
442     }
443
444     return isRetryAllowed;
445   }
446   
447   /**
448    * Process store document result.
449    *
450    * @param esPutResult the es put result
451    * @param esGetResult the es get result
452    * @param ae the ae
453    */
454   private void processStoreDocumentResult(NetworkTransaction esPutResult,
455       NetworkTransaction esGetResult, AggregationEntity ae) {
456
457     OperationResult or = esPutResult.getOperationResult();
458
459     if (!or.wasSuccessful()) {
460       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
461
462         if (shouldAllowRetry(ae.getId())) {
463           esWorkOnHand.incrementAndGet();
464
465           RetryAggregationEntitySyncContainer rsc =
466               new RetryAggregationEntitySyncContainer(esGetResult, ae);
467           retryQueue.push(rsc);
468
469           String message = "Store document failed during aggregation entity synchronization"
470               + " due to version conflict. Entity will be re-synced.";
471           LOG.warn(AaiUiMsgs.ERROR_GENERIC, message);
472         }
473       } else {
474         String message =
475             "Store document failed during aggregation entity synchronization with result code "
476                 + or.getResultCode() + " and result message " + or.getResult();
477         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
478       }
479     }
480   }
481   
482   /**
483    * Sync entity types.
484    */
485   private void syncEntityTypes() {
486
487     while (selflinks.peek() != null) {
488
489       SelfLinkDescriptor linkDescriptor = selflinks.poll();
490       aaiWorkOnHand.decrementAndGet();
491
492       OxmEntityDescriptor descriptor = null;
493
494       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
495
496         descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
497
498         if (descriptor == null) {
499           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
500           // go to next element in iterator
501           continue;
502         }
503
504         NetworkTransaction txn = new NetworkTransaction();
505         txn.setDescriptor(descriptor);
506         txn.setLink(linkDescriptor.getSelfLink());
507         txn.setOperationType(HttpMethod.GET);
508         txn.setEntityType(linkDescriptor.getEntityType());
509
510         aaiWorkOnHand.incrementAndGet();
511
512         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
513             .whenComplete((result, error) -> {
514
515               aaiWorkOnHand.decrementAndGet();
516
517               if (error != null) {
518                 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
519               } else {
520                 if (result == null) {
521                   LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
522                       linkDescriptor.getSelfLink());
523                 } else {
524                   updateActiveInventoryCounters(result);
525                   fetchDocumentForUpsert(result);
526                 }
527               }
528             });
529       }
530
531     }
532
533   }
534   
535   /**
536    * Fetch document for upsert.
537    *
538    * @param txn the txn
539    */
540   private void fetchDocumentForUpsert(NetworkTransaction txn) {
541     // modified
542     if (!txn.getOperationResult().wasSuccessful()) {
543       String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
544       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
545       return;
546     }
547
548     try {
549         final String jsonResult = txn.getOperationResult().getResult();
550         if (jsonResult != null && jsonResult.length() > 0) {
551
552           AggregationEntity ae = new AggregationEntity();
553           ae.setLink(ActiveInventoryAdapter.extractResourcePath(txn.getLink()));
554           populateAggregationEntityDocument(ae, jsonResult, txn.getDescriptor());
555           ae.deriveFields();
556
557           String link = null;
558           try {
559             link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId());
560           } catch (Exception exc) {
561             LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
562           }
563
564           if (link != null) {
565             NetworkTransaction n2 = new NetworkTransaction();
566             n2.setLink(link);
567             n2.setEntityType(txn.getEntityType());
568             n2.setDescriptor(txn.getDescriptor());
569             n2.setOperationType(HttpMethod.GET);
570
571             esWorkOnHand.incrementAndGet();
572
573             supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
574                 .whenComplete((result, error) -> {
575
576                   esWorkOnHand.decrementAndGet();
577
578                   if (error != null) {
579                     LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
580                   } else {
581                     updateElasticSearchCounters(result);
582                     performDocumentUpsert(result, ae);
583                   }
584                 });
585           }
586         }
587
588                 } catch (JsonProcessingException exc) {
589                         LOG.error(AaiUiMsgs.ERROR_GENERIC,
590                                         "There was a JSON processing error fetching the elastic document for upsert.  Error: "
591                                                         + exc.getMessage());
592                 } catch (IOException exc) {
593                         LOG.error(AaiUiMsgs.ERROR_GENERIC,
594                                         "There was an IO error fetching the elastic document for upsert.  Error: " + exc.getMessage());
595                 }
596           }
597   
598   
599   /**
600    * Populate aggregation entity document.
601    *
602    * @param doc the doc
603    * @param result the result
604    * @param resultDescriptor the result descriptor
605    * @throws JsonProcessingException the json processing exception
606    * @throws IOException Signals that an I/O exception has occurred.
607    */
608   protected void populateAggregationEntityDocument(AggregationEntity doc, String result,
609       OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
610     doc.setEntityType(resultDescriptor.getEntityName());
611     JsonNode entityNode = mapper.readTree(result);
612     Map<String, Object> map = mapper.convertValue(entityNode, Map.class);
613     doc.copyAttributeKeyValuePair(map);
614   }
615
616   /**
617    * Process entity type self links.
618    *
619    * @param operationResult the operation result
620    */
621   private void processEntityTypeSelfLinks(OperationResult operationResult) {
622
623     if (operationResult == null) {
624       return;
625     }
626
627     final String jsonResult = operationResult.getResult();
628
629     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
630
631       try {
632         JsonNode rootNode = mapper.readTree(jsonResult);
633
634         JsonNode resultData = rootNode.get("result-data");
635
636         if (resultData.isArray()) {
637           ArrayNode resultDataArrayNode = (ArrayNode) resultData;
638
639           Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
640
641           while (elementIterator.hasNext()) {
642             JsonNode element = elementIterator.next();
643
644             final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
645             final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
646
647             if (resourceType != null && resourceLink != null) {
648
649               OxmEntityDescriptor descriptor = oxmEntityLookup.getEntityDescriptors().get(resourceType);
650
651               if (descriptor == null) {
652                 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
653                 // go to next element in iterator
654                 continue;
655               }
656
657               selflinks.add(new SelfLinkDescriptor(resourceLink, SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
658
659             }
660           }
661         }
662       } catch (IOException exc) {
663         String message =
664                 "Could not deserialize JSON (representing operation result) as node tree. " +
665                         "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
666         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
667       }
668     }
669
670   }
671
672   /*
673    * (non-Javadoc)
674    * 
675    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
676    */
677   @Override
678   public OperationState doSync() {
679     this.syncDurationInMs = -1;
680     syncStartedTimeStampInMs = System.currentTimeMillis();
681     String txnID = NodeUtils.getRandomTxnId();
682     MdcContext.initialize(txnID, "AggregationSynchronizer", "", "Sync", "");
683     
684     return collectAllTheWork();
685   }
686
687   @Override
688   public SynchronizerState getState() {
689
690     if (!isSyncDone()) {
691       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
692     }
693
694     return SynchronizerState.IDLE;
695
696   }
697
698   /*
699    * (non-Javadoc)
700    * 
701    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
702    */
703   @Override
704   public String getStatReport(boolean showFinalReport) {
705     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
706     return getStatReport(syncDurationInMs, showFinalReport);
707   }
708
709   public String getEntityType() {
710     return entityType;
711   }
712
713   public void setEntityType(String entityType) {
714     this.entityType = entityType;
715   }
716
717   /*
718    * (non-Javadoc)
719    * 
720    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
721    */
722   @Override
723   public void shutdown() {
724     this.shutdownExecutors();
725   }
726
727   @Override
728   protected boolean isSyncDone() {
729
730     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
731
732     if (LOG.isDebugEnabled()) {
733       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + ", isSyncDone(), totalWorkOnHand = "
734           + totalWorkOnHand + " all work enumerated = " + allWorkEnumerated);
735     }
736
737     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
738       return false;
739     }
740
741     this.syncInProgress = false;
742
743     return true;
744   }
745
746   /*
747    * (non-Javadoc)
748    * 
749    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
750    */
751   @Override
752   public void clearCache() {
753
754     if (syncInProgress) {
755       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
756           "Autosuggestion Entity Summarizer in progress, request to clear cache ignored");
757       return;
758     }
759
760     super.clearCache();
761     this.resetCounters();
762     if (entityCounters != null) {
763       entityCounters.clear();
764     }
765
766     allWorkEnumerated = false;
767
768   }
769
770 }