Adding interfaces in documentation
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / aggregation / sync / AggregationSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sparky.aggregation.sync;
22
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Deque;
28 import java.util.EnumSet;
29 import java.util.Iterator;
30 import java.util.Map;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.ConcurrentLinkedDeque;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.function.Supplier;
36
37 import org.onap.aai.cl.api.Logger;
38 import org.onap.aai.cl.eelf.LoggerFactory;
39 import org.onap.aai.cl.mdc.MdcContext;
40 import org.onap.aai.restclient.client.OperationResult;
41 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
42 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
43 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
44 import org.onap.aai.sparky.dal.NetworkTransaction;
45 import org.onap.aai.sparky.dal.rest.HttpMethod;
46 import org.onap.aai.sparky.logging.AaiUiMsgs;
47 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
48 import org.onap.aai.sparky.sync.IndexSynchronizer;
49 import org.onap.aai.sparky.sync.SynchronizerConstants;
50 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
51 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
52 import org.onap.aai.sparky.sync.entity.AggregationEntity;
53 import org.onap.aai.sparky.sync.entity.MergableEntity;
54 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
55 import org.onap.aai.sparky.sync.enumeration.OperationState;
56 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
57 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
58 import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
59 import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
60 import org.onap.aai.sparky.sync.task.PerformElasticSearchUpdate;
61 import org.onap.aai.sparky.util.NodeUtils;
62 import org.slf4j.MDC;
63
64 import com.fasterxml.jackson.core.JsonProcessingException;
65 import com.fasterxml.jackson.databind.JsonNode;
66 import com.fasterxml.jackson.databind.ObjectReader;
67 import com.fasterxml.jackson.databind.node.ArrayNode;
68
69 /**
70  * The Class AutosuggestionSynchronizer.
71  */
72 public class AggregationSynchronizer extends AbstractEntitySynchronizer
73     implements IndexSynchronizer {
74
75   /**
76    * The Class RetryAggregationEntitySyncContainer.
77    */
78   private class RetryAggregationEntitySyncContainer {
79     NetworkTransaction txn;
80     AggregationEntity ae;
81
82     /**
83      * Instantiates a new retry aggregation entity sync container.
84      *
85      * @param txn the txn
86      * @param ae the se
87      */
88     public RetryAggregationEntitySyncContainer(NetworkTransaction txn, AggregationEntity ae) {
89       this.txn = txn;
90       this.ae = ae;
91     }
92
93     public NetworkTransaction getNetworkTransaction() {
94       return txn;
95     }
96
97     public AggregationEntity getAggregationEntity() {
98       return ae;
99     }
100   }
101   
102   private static final Logger LOG =
103       LoggerFactory.getInstance().getLogger(AggregationSynchronizer.class);
104   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
105
106   private boolean allWorkEnumerated;
107   private Deque<SelfLinkDescriptor> selflinks;
108   private Deque<RetryAggregationEntitySyncContainer> retryQueue;
109   private Map<String, Integer> retryLimitTracker;
110   protected ExecutorService esPutExecutor;
111   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
112   private boolean syncInProgress;
113   private Map<String, String> contextMap;
114   private String entityType;
115   private ElasticSearchSchemaConfig schemaConfig;
116   private OxmEntityLookup oxmEntityLookup;
117   
118   /**
119    * Instantiates a new entity aggregation synchronizer.
120    *
121    * @param indexName the index name
122    * @throws Exception the exception
123    */
124   public AggregationSynchronizer(String entityType, ElasticSearchSchemaConfig schemaConfig,
125       int numSyncWorkers, int numActiveInventoryWorkers, int numElasticWorkers,
126       NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig,
127       OxmEntityLookup oxmEntityLookup) throws Exception {
128     
129     super(LOG, "AGGES-" + schemaConfig.getIndexName().toUpperCase(), numSyncWorkers,
130         numActiveInventoryWorkers, numElasticWorkers, schemaConfig.getIndexName(),aaiStatConfig, esStatConfig); 
131     
132     this.oxmEntityLookup = oxmEntityLookup;
133     
134     this.schemaConfig = schemaConfig;
135     this.entityType = entityType;
136     this.allWorkEnumerated = false;
137     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
138     this.synchronizerName = "Entity Aggregation Synchronizer";
139     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
140     this.syncInProgress = false;
141     this.allWorkEnumerated = false;
142     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
143     this.retryQueue = new ConcurrentLinkedDeque<RetryAggregationEntitySyncContainer>();
144     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
145     
146     this.esPutExecutor = NodeUtils.createNamedExecutor("AGGES-ES-PUT", 1, LOG);
147     
148     this.aaiEntityStats.intializeEntityCounters(entityType);
149     this.esEntityStats.intializeEntityCounters(entityType);
150         
151     this.contextMap = MDC.getCopyOfContextMap();
152   }
153
154   /**
155    * Collect all the work.
156    *
157    * @return the operation state
158    */
159   private OperationState collectAllTheWork() {
160     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
161     final String entity = this.getEntityType();
162     try {
163
164       aaiWorkOnHand.set(1);
165
166       supplyAsync(new Supplier<Void>() {
167
168         @Override
169         public Void get() {
170           MDC.setContextMap(contextMap);
171           OperationResult typeLinksResult = null;
172           try {
173             typeLinksResult = aaiAdapter.getSelfLinksByEntityType(entity);
174             aaiWorkOnHand.decrementAndGet();
175             processEntityTypeSelfLinks(typeLinksResult);
176           } catch (Exception exc) {
177                   LOG.error(AaiUiMsgs.ERROR_GENERIC, "Processing execption while building working set.  Error:" 
178                        + exc.getMessage());
179           }
180
181           return null;
182         }
183
184       }, aaiExecutor).whenComplete((result, error) -> {
185
186         if (error != null) {
187           LOG.error(AaiUiMsgs.ERROR_GENERIC,
188               "An error occurred getting data from AAI. Error = " + error.getMessage());
189         }
190       });
191
192       while (aaiWorkOnHand.get() != 0) {
193
194         if (LOG.isDebugEnabled()) {
195           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
196         }
197
198         Thread.sleep(1000);
199       }
200
201       aaiWorkOnHand.set(selflinks.size());
202       allWorkEnumerated = true;
203       syncEntityTypes();
204
205       while (!isSyncDone()) {
206         performRetrySync();
207         Thread.sleep(1000);
208       }
209
210       /*
211        * Make sure we don't hang on to retries that failed which could cause issues during future
212        * syncs
213        */
214       retryLimitTracker.clear();
215
216     } catch (Exception exc) {
217       // TODO -> LOG, waht should be logged here?
218     }
219
220     return OperationState.OK;
221   }
222   
223   
224   /**
225    * Perform retry sync.
226    */
227   private void performRetrySync() {
228     while (retryQueue.peek() != null) {
229
230       RetryAggregationEntitySyncContainer rsc = retryQueue.poll();
231       if (rsc != null) {
232
233         AggregationEntity ae = rsc.getAggregationEntity();
234         NetworkTransaction txn = rsc.getNetworkTransaction();
235
236         String link = null;
237         try {
238           /*
239            * In this retry flow the se object has already derived its fields
240            */
241           link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId());
242         } catch (Exception exc) {
243           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
244         }
245
246         if (link != null) {
247           NetworkTransaction retryTransaction = new NetworkTransaction();
248           retryTransaction.setLink(link);
249           retryTransaction.setEntityType(txn.getEntityType());
250           retryTransaction.setDescriptor(txn.getDescriptor());
251           retryTransaction.setOperationType(HttpMethod.GET);
252
253           /*
254            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
255            * called incrementAndGet when queuing the failed PUT!
256            */
257
258           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
259               esExecutor).whenComplete((result, error) -> {
260
261                 esWorkOnHand.decrementAndGet();
262
263                 if (error != null) {
264                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
265                 } else {
266                   updateElasticSearchCounters(result);
267                   performDocumentUpsert(result, ae);
268                 }
269               });
270         }
271
272       }
273     }
274   }
275   
276   /**
277    * Perform document upsert.
278    *
279    * @param esGetTxn the es get txn
280    * @param ae the ae
281    */
282   protected void performDocumentUpsert(NetworkTransaction esGetTxn, AggregationEntity ae) {
283     /**
284      * <p>
285      * <ul>
286      * As part of the response processing we need to do the following:
287      * <li>1. Extract the version (if present), it will be the ETAG when we use the
288      * Search-Abstraction-Service
289      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
290      * tag
291      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
292      * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
293      * </ul>
294      * </p>
295      */
296     String link = null;
297     try {
298       link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId()); 
299     } catch (Exception exc) {
300       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
301       return;
302     }
303
304     String versionNumber = null;
305     boolean wasEntryDiscovered = false;
306     if (esGetTxn.getOperationResult().getResultCode() == 404) {
307       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, ae.getEntityPrimaryKeyValue());
308     } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
309       wasEntryDiscovered = true;
310       try {
311         versionNumber = NodeUtils.extractFieldValueFromObject(
312             NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
313             "_version");
314       } catch (IOException exc) {
315         String message =
316             "Error extracting version number from response, aborting aggregation entity sync of "
317                 + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
318         LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
319         return;
320       }
321     } else {
322       /*
323        * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
324        * return.
325        */
326       LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
327           String.valueOf(esGetTxn.getOperationResult().getResultCode()));
328       return;
329     }
330
331     try {
332       String jsonPayload = null;
333       if (wasEntryDiscovered) {
334         try {
335           ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
336           NodeUtils.extractObjectsByKey(
337               NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
338               "_source", sourceObject);
339
340           if (!sourceObject.isEmpty()) {
341             String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
342             MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
343             ObjectReader updater = mapper.readerForUpdating(me);
344             MergableEntity merged = updater.readValue(ae.getAsJson());
345             jsonPayload = mapper.writeValueAsString(merged);
346           }
347         } catch (IOException exc) {
348           String message =
349               "Error extracting source value from response, aborting aggregation entity sync of "
350                   + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
351           LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
352           return;
353         }
354       } else {
355         jsonPayload = ae.getAsJson();
356       }
357
358       if (wasEntryDiscovered) {
359         if (versionNumber != null && jsonPayload != null) {
360
361           String requestPayload =
362               elasticSearchAdapter.buildBulkImportOperationRequest(schemaConfig.getIndexName(),
363                   schemaConfig.getIndexDocType(), ae.getId(), versionNumber, jsonPayload);
364
365           NetworkTransaction transactionTracker = new NetworkTransaction();
366           transactionTracker.setEntityType(esGetTxn.getEntityType());
367           transactionTracker.setDescriptor(esGetTxn.getDescriptor());
368           transactionTracker.setOperationType(HttpMethod.PUT);
369
370           esWorkOnHand.incrementAndGet();
371           supplyAsync(new PerformElasticSearchUpdate(elasticSearchAdapter.getBulkUrl(),
372               requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor)
373                   .whenComplete((result, error) -> {
374
375                     esWorkOnHand.decrementAndGet();
376
377                     if (error != null) {
378                       String message = "Aggregation entity sync UPDATE PUT error - "
379                           + error.getLocalizedMessage();
380                       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
381                     } else {
382                       updateElasticSearchCounters(result);
383                       processStoreDocumentResult(result, esGetTxn, ae);
384                     }
385                   });
386         }
387
388       } else {
389         if (link != null && jsonPayload != null) {
390
391           NetworkTransaction updateElasticTxn = new NetworkTransaction();
392           updateElasticTxn.setLink(link);
393           updateElasticTxn.setEntityType(esGetTxn.getEntityType());
394           updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
395           updateElasticTxn.setOperationType(HttpMethod.PUT);
396
397           esWorkOnHand.incrementAndGet();
398           supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
399               esPutExecutor).whenComplete((result, error) -> {
400
401                 esWorkOnHand.decrementAndGet();
402
403                 if (error != null) {
404                   String message =
405                       "Aggregation entity sync UPDATE PUT error - " + error.getLocalizedMessage();
406                   LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
407                 } else {
408                   updateElasticSearchCounters(result);
409                   processStoreDocumentResult(result, esGetTxn, ae);
410                 }
411               });
412         }
413       }
414     } catch (Exception exc) {
415       String message = "Exception caught during aggregation entity sync PUT operation. Message - "
416           + exc.getLocalizedMessage();
417       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
418     }
419   }
420   
421   /**
422    * Should allow retry.
423    *
424    * @param id the id
425    * @return true, if successful
426    */
427   private boolean shouldAllowRetry(String id) {
428     boolean isRetryAllowed = true;
429     if (retryLimitTracker.get(id) != null) {
430       Integer currentCount = retryLimitTracker.get(id);
431       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
432         isRetryAllowed = false;
433         String message = "Aggregation entity re-sync limit reached for " + id
434             + ", re-sync will no longer be attempted for this entity";
435         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
436       } else {
437         Integer newCount = new Integer(currentCount.intValue() + 1);
438         retryLimitTracker.put(id, newCount);
439       }
440     } else {
441       Integer firstRetryCount = new Integer(1);
442       retryLimitTracker.put(id, firstRetryCount);
443     }
444
445     return isRetryAllowed;
446   }
447   
448   /**
449    * Process store document result.
450    *
451    * @param esPutResult the es put result
452    * @param esGetResult the es get result
453    * @param ae the ae
454    */
455   private void processStoreDocumentResult(NetworkTransaction esPutResult,
456       NetworkTransaction esGetResult, AggregationEntity ae) {
457
458     OperationResult or = esPutResult.getOperationResult();
459
460     if (!or.wasSuccessful()) {
461       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
462
463         if (shouldAllowRetry(ae.getId())) {
464           esWorkOnHand.incrementAndGet();
465
466           RetryAggregationEntitySyncContainer rsc =
467               new RetryAggregationEntitySyncContainer(esGetResult, ae);
468           retryQueue.push(rsc);
469
470           String message = "Store document failed during aggregation entity synchronization"
471               + " due to version conflict. Entity will be re-synced.";
472           LOG.warn(AaiUiMsgs.ERROR_GENERIC, message);
473         }
474       } else {
475         String message =
476             "Store document failed during aggregation entity synchronization with result code "
477                 + or.getResultCode() + " and result message " + or.getResult();
478         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
479       }
480     }
481   }
482   
483   /**
484    * Sync entity types.
485    */
486   private void syncEntityTypes() {
487
488     while (selflinks.peek() != null) {
489
490       SelfLinkDescriptor linkDescriptor = selflinks.poll();
491       aaiWorkOnHand.decrementAndGet();
492
493       OxmEntityDescriptor descriptor = null;
494
495       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
496
497         descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
498
499         if (descriptor == null) {
500           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
501           // go to next element in iterator
502           continue;
503         }
504
505         NetworkTransaction txn = new NetworkTransaction();
506         txn.setDescriptor(descriptor);
507         txn.setLink(linkDescriptor.getSelfLink());
508         txn.setOperationType(HttpMethod.GET);
509         txn.setEntityType(linkDescriptor.getEntityType());
510
511         aaiWorkOnHand.incrementAndGet();
512
513         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
514             .whenComplete((result, error) -> {
515
516               aaiWorkOnHand.decrementAndGet();
517
518               if (error != null) {
519                 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
520               } else {
521                 if (result == null) {
522                   LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
523                       linkDescriptor.getSelfLink());
524                 } else {
525                   updateActiveInventoryCounters(result);
526                   fetchDocumentForUpsert(result);
527                 }
528               }
529             });
530       }
531
532     }
533
534   }
535   
536   /**
537    * Fetch document for upsert.
538    *
539    * @param txn the txn
540    */
541   private void fetchDocumentForUpsert(NetworkTransaction txn) {
542     // modified
543     if (!txn.getOperationResult().wasSuccessful()) {
544       String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
545       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
546       return;
547     }
548
549     try {
550         final String jsonResult = txn.getOperationResult().getResult();
551         if (jsonResult != null && jsonResult.length() > 0) {
552
553           AggregationEntity ae = new AggregationEntity();
554           ae.setLink(ActiveInventoryAdapter.extractResourcePath(txn.getLink()));
555           populateAggregationEntityDocument(ae, jsonResult, txn.getDescriptor());
556           ae.deriveFields();
557
558           String link = null;
559           try {
560             link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId());
561           } catch (Exception exc) {
562             LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
563           }
564
565           if (link != null) {
566             NetworkTransaction n2 = new NetworkTransaction();
567             n2.setLink(link);
568             n2.setEntityType(txn.getEntityType());
569             n2.setDescriptor(txn.getDescriptor());
570             n2.setOperationType(HttpMethod.GET);
571
572             esWorkOnHand.incrementAndGet();
573
574             supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
575                 .whenComplete((result, error) -> {
576
577                   esWorkOnHand.decrementAndGet();
578
579                   if (error != null) {
580                     LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
581                   } else {
582                     updateElasticSearchCounters(result);
583                     performDocumentUpsert(result, ae);
584                   }
585                 });
586           }
587         }
588
589                 } catch (JsonProcessingException exc) {
590                         LOG.error(AaiUiMsgs.ERROR_GENERIC,
591                                         "There was a JSON processing error fetching the elastic document for upsert.  Error: "
592                                                         + exc.getMessage());
593                 } catch (IOException exc) {
594                         LOG.error(AaiUiMsgs.ERROR_GENERIC,
595                                         "There was an IO error fetching the elastic document for upsert.  Error: " + exc.getMessage());
596                 }
597           }
598   
599   
600   /**
601    * Populate aggregation entity document.
602    *
603    * @param doc the doc
604    * @param result the result
605    * @param resultDescriptor the result descriptor
606    * @throws JsonProcessingException the json processing exception
607    * @throws IOException Signals that an I/O exception has occurred.
608    */
609   protected void populateAggregationEntityDocument(AggregationEntity doc, String result,
610       OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
611     doc.setEntityType(resultDescriptor.getEntityName());
612     JsonNode entityNode = mapper.readTree(result);
613     Map<String, Object> map = mapper.convertValue(entityNode, Map.class);
614     doc.copyAttributeKeyValuePair(map);
615   }
616   
617   /**
618    * Process entity type self links.
619    *
620    * @param operationResult the operation result
621    */
622   private void processEntityTypeSelfLinks(OperationResult operationResult) {
623
624     JsonNode rootNode = null;
625     
626     if ( operationResult == null ) {
627         return;
628     }
629
630     final String jsonResult = operationResult.getResult();
631
632     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
633
634       try {
635         rootNode = mapper.readTree(jsonResult);
636       } catch (IOException exc) {
637         String message =
638             "Could not deserialize JSON (representing operation result) as node tree. " +
639             "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
640         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
641       }
642
643       JsonNode resultData = rootNode.get("result-data");
644       ArrayNode resultDataArrayNode = null;
645
646       if (resultData.isArray()) {
647         resultDataArrayNode = (ArrayNode) resultData;
648
649         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
650         JsonNode element = null;
651
652         while (elementIterator.hasNext()) {
653           element = elementIterator.next();
654
655           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
656           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
657
658           OxmEntityDescriptor descriptor = null;
659
660           if (resourceType != null && resourceLink != null) {
661
662             descriptor = oxmEntityLookup.getEntityDescriptors().get(resourceType);
663
664             if (descriptor == null) {
665               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
666               // go to next element in iterator
667               continue;
668             }
669
670             selflinks.add(new SelfLinkDescriptor(resourceLink, SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
671             
672
673           }
674         }
675       }
676     }
677
678   }
679
680   /*
681    * (non-Javadoc)
682    * 
683    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
684    */
685   @Override
686   public OperationState doSync() {
687     this.syncDurationInMs = -1;
688     syncStartedTimeStampInMs = System.currentTimeMillis();
689     String txnID = NodeUtils.getRandomTxnId();
690     MdcContext.initialize(txnID, "AggregationSynchronizer", "", "Sync", "");
691     
692     return collectAllTheWork();
693   }
694
695   @Override
696   public SynchronizerState getState() {
697
698     if (!isSyncDone()) {
699       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
700     }
701
702     return SynchronizerState.IDLE;
703
704   }
705
706   /*
707    * (non-Javadoc)
708    * 
709    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
710    */
711   @Override
712   public String getStatReport(boolean showFinalReport) {
713     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
714     return getStatReport(syncDurationInMs, showFinalReport);
715   }
716
717   public String getEntityType() {
718     return entityType;
719   }
720
721   public void setEntityType(String entityType) {
722     this.entityType = entityType;
723   }
724
725   /*
726    * (non-Javadoc)
727    * 
728    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
729    */
730   @Override
731   public void shutdown() {
732     this.shutdownExecutors();
733   }
734
735   @Override
736   protected boolean isSyncDone() {
737
738     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
739
740     if (LOG.isDebugEnabled()) {
741       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + ", isSyncDone(), totalWorkOnHand = "
742           + totalWorkOnHand + " all work enumerated = " + allWorkEnumerated);
743     }
744
745     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
746       return false;
747     }
748
749     this.syncInProgress = false;
750
751     return true;
752   }
753
754   /*
755    * (non-Javadoc)
756    * 
757    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
758    */
759   @Override
760   public void clearCache() {
761
762     if (syncInProgress) {
763       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
764           "Autosuggestion Entity Summarizer in progress, request to clear cache ignored");
765       return;
766     }
767
768     super.clearCache();
769     this.resetCounters();
770     if (entityCounters != null) {
771       entityCounters.clear();
772     }
773
774     allWorkEnumerated = false;
775
776   }
777
778 }