Adding more sparky junit coverage
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / aggregation / sync / AggregationSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.aggregation.sync;
24
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
26
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Deque;
30 import java.util.EnumSet;
31 import java.util.Iterator;
32 import java.util.Map;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentLinkedDeque;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.atomic.AtomicInteger;
37 import java.util.function.Supplier;
38
39 import org.onap.aai.cl.api.Logger;
40 import org.onap.aai.cl.eelf.LoggerFactory;
41 import org.onap.aai.cl.mdc.MdcContext;
42 import org.onap.aai.restclient.client.OperationResult;
43 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
44 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
45 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
46 import org.onap.aai.sparky.dal.NetworkTransaction;
47 import org.onap.aai.sparky.dal.rest.HttpMethod;
48 import org.onap.aai.sparky.logging.AaiUiMsgs;
49 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
50 import org.onap.aai.sparky.sync.IndexSynchronizer;
51 import org.onap.aai.sparky.sync.SynchronizerConstants;
52 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
53 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
54 import org.onap.aai.sparky.sync.entity.AggregationEntity;
55 import org.onap.aai.sparky.sync.entity.MergableEntity;
56 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
57 import org.onap.aai.sparky.sync.enumeration.OperationState;
58 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
59 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
60 import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
61 import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
62 import org.onap.aai.sparky.sync.task.PerformElasticSearchUpdate;
63 import org.onap.aai.sparky.util.NodeUtils;
64 import org.slf4j.MDC;
65
66 import com.fasterxml.jackson.core.JsonProcessingException;
67 import com.fasterxml.jackson.databind.JsonNode;
68 import com.fasterxml.jackson.databind.ObjectReader;
69 import com.fasterxml.jackson.databind.node.ArrayNode;
70
71 /**
72  * The Class AutosuggestionSynchronizer.
73  */
74 public class AggregationSynchronizer extends AbstractEntitySynchronizer
75     implements IndexSynchronizer {
76
77   /**
78    * The Class RetryAggregationEntitySyncContainer.
79    */
80   private class RetryAggregationEntitySyncContainer {
81     NetworkTransaction txn;
82     AggregationEntity ae;
83
84     /**
85      * Instantiates a new retry aggregation entity sync container.
86      *
87      * @param txn the txn
88      * @param ae the se
89      */
90     public RetryAggregationEntitySyncContainer(NetworkTransaction txn, AggregationEntity ae) {
91       this.txn = txn;
92       this.ae = ae;
93     }
94
95     public NetworkTransaction getNetworkTransaction() {
96       return txn;
97     }
98
99     public AggregationEntity getAggregationEntity() {
100       return ae;
101     }
102   }
103   
104   private static final Logger LOG =
105       LoggerFactory.getInstance().getLogger(AggregationSynchronizer.class);
106   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
107
108   private boolean allWorkEnumerated;
109   private Deque<SelfLinkDescriptor> selflinks;
110   private Deque<RetryAggregationEntitySyncContainer> retryQueue;
111   private Map<String, Integer> retryLimitTracker;
112   protected ExecutorService esPutExecutor;
113   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
114   private boolean syncInProgress;
115   private Map<String, String> contextMap;
116   private String entityType;
117   private ElasticSearchSchemaConfig schemaConfig;
118   private OxmEntityLookup oxmEntityLookup;
119   
120   /**
121    * Instantiates a new entity aggregation synchronizer.
122    *
123    * @param indexName the index name
124    * @throws Exception the exception
125    */
126   public AggregationSynchronizer(String entityType, ElasticSearchSchemaConfig schemaConfig,
127       int numSyncWorkers, int numActiveInventoryWorkers, int numElasticWorkers,
128       NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig,
129       OxmEntityLookup oxmEntityLookup) throws Exception {
130     
131     super(LOG, "AGGES-" + schemaConfig.getIndexName().toUpperCase(), numSyncWorkers,
132         numActiveInventoryWorkers, numElasticWorkers, schemaConfig.getIndexName(),aaiStatConfig, esStatConfig); 
133     
134     this.oxmEntityLookup = oxmEntityLookup;
135     
136     this.schemaConfig = schemaConfig;
137     this.entityType = entityType;
138     this.allWorkEnumerated = false;
139     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
140     this.synchronizerName = "Entity Aggregation Synchronizer";
141     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
142     this.syncInProgress = false;
143     this.allWorkEnumerated = false;
144     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
145     this.retryQueue = new ConcurrentLinkedDeque<RetryAggregationEntitySyncContainer>();
146     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
147     
148     this.esPutExecutor = NodeUtils.createNamedExecutor("AGGES-ES-PUT", 1, LOG);
149     
150     this.aaiEntityStats.intializeEntityCounters(entityType);
151     this.esEntityStats.intializeEntityCounters(entityType);
152         
153     this.contextMap = MDC.getCopyOfContextMap();
154   }
155
156   /**
157    * Collect all the work.
158    *
159    * @return the operation state
160    */
161   private OperationState collectAllTheWork() {
162     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
163     final String entity = this.getEntityType();
164     try {
165
166       aaiWorkOnHand.set(1);
167
168       supplyAsync(new Supplier<Void>() {
169
170         @Override
171         public Void get() {
172           MDC.setContextMap(contextMap);
173           OperationResult typeLinksResult = null;
174           try {
175             typeLinksResult = aaiAdapter.getSelfLinksByEntityType(entity);
176             aaiWorkOnHand.decrementAndGet();
177             processEntityTypeSelfLinks(typeLinksResult);
178           } catch (Exception exc) {
179                   LOG.error(AaiUiMsgs.ERROR_GENERIC, "Processing execption while building working set.  Error:" 
180                        + exc.getMessage());
181           }
182
183           return null;
184         }
185
186       }, aaiExecutor).whenComplete((result, error) -> {
187
188         if (error != null) {
189           LOG.error(AaiUiMsgs.ERROR_GENERIC,
190               "An error occurred getting data from AAI. Error = " + error.getMessage());
191         }
192       });
193
194       while (aaiWorkOnHand.get() != 0) {
195
196         if (LOG.isDebugEnabled()) {
197           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
198         }
199
200         Thread.sleep(1000);
201       }
202
203       aaiWorkOnHand.set(selflinks.size());
204       allWorkEnumerated = true;
205       syncEntityTypes();
206
207       while (!isSyncDone()) {
208         performRetrySync();
209         Thread.sleep(1000);
210       }
211
212       /*
213        * Make sure we don't hang on to retries that failed which could cause issues during future
214        * syncs
215        */
216       retryLimitTracker.clear();
217
218     } catch (Exception exc) {
219       // TODO -> LOG, waht should be logged here?
220     }
221
222     return OperationState.OK;
223   }
224   
225   
226   /**
227    * Perform retry sync.
228    */
229   private void performRetrySync() {
230     while (retryQueue.peek() != null) {
231
232       RetryAggregationEntitySyncContainer rsc = retryQueue.poll();
233       if (rsc != null) {
234
235         AggregationEntity ae = rsc.getAggregationEntity();
236         NetworkTransaction txn = rsc.getNetworkTransaction();
237
238         String link = null;
239         try {
240           /*
241            * In this retry flow the se object has already derived its fields
242            */
243           link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId());
244         } catch (Exception exc) {
245           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
246         }
247
248         if (link != null) {
249           NetworkTransaction retryTransaction = new NetworkTransaction();
250           retryTransaction.setLink(link);
251           retryTransaction.setEntityType(txn.getEntityType());
252           retryTransaction.setDescriptor(txn.getDescriptor());
253           retryTransaction.setOperationType(HttpMethod.GET);
254
255           /*
256            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
257            * called incrementAndGet when queuing the failed PUT!
258            */
259
260           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
261               esExecutor).whenComplete((result, error) -> {
262
263                 esWorkOnHand.decrementAndGet();
264
265                 if (error != null) {
266                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
267                 } else {
268                   updateElasticSearchCounters(result);
269                   performDocumentUpsert(result, ae);
270                 }
271               });
272         }
273
274       }
275     }
276   }
277   
278   /**
279    * Perform document upsert.
280    *
281    * @param esGetTxn the es get txn
282    * @param ae the ae
283    */
284   protected void performDocumentUpsert(NetworkTransaction esGetTxn, AggregationEntity ae) {
285     /**
286      * <p>
287      * <ul>
288      * As part of the response processing we need to do the following:
289      * <li>1. Extract the version (if present), it will be the ETAG when we use the
290      * Search-Abstraction-Service
291      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
292      * tag
293      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
294      * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
295      * </ul>
296      * </p>
297      */
298     String link = null;
299     try {
300       link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId()); 
301     } catch (Exception exc) {
302       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
303       return;
304     }
305
306     String versionNumber = null;
307     boolean wasEntryDiscovered = false;
308     if (esGetTxn.getOperationResult().getResultCode() == 404) {
309       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, ae.getEntityPrimaryKeyValue());
310     } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
311       wasEntryDiscovered = true;
312       try {
313         versionNumber = NodeUtils.extractFieldValueFromObject(
314             NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
315             "_version");
316       } catch (IOException exc) {
317         String message =
318             "Error extracting version number from response, aborting aggregation entity sync of "
319                 + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
320         LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
321         return;
322       }
323     } else {
324       /*
325        * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
326        * return.
327        */
328       LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
329           String.valueOf(esGetTxn.getOperationResult().getResultCode()));
330       return;
331     }
332
333     try {
334       String jsonPayload = null;
335       if (wasEntryDiscovered) {
336         try {
337           ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
338           NodeUtils.extractObjectsByKey(
339               NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
340               "_source", sourceObject);
341
342           if (!sourceObject.isEmpty()) {
343             String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
344             MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
345             ObjectReader updater = mapper.readerForUpdating(me);
346             MergableEntity merged = updater.readValue(ae.getAsJson());
347             jsonPayload = mapper.writeValueAsString(merged);
348           }
349         } catch (IOException exc) {
350           String message =
351               "Error extracting source value from response, aborting aggregation entity sync of "
352                   + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
353           LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
354           return;
355         }
356       } else {
357         jsonPayload = ae.getAsJson();
358       }
359
360       if (wasEntryDiscovered) {
361         if (versionNumber != null && jsonPayload != null) {
362
363           String requestPayload =
364               elasticSearchAdapter.buildBulkImportOperationRequest(schemaConfig.getIndexName(),
365                   schemaConfig.getIndexDocType(), ae.getId(), versionNumber, jsonPayload);
366
367           NetworkTransaction transactionTracker = new NetworkTransaction();
368           transactionTracker.setEntityType(esGetTxn.getEntityType());
369           transactionTracker.setDescriptor(esGetTxn.getDescriptor());
370           transactionTracker.setOperationType(HttpMethod.PUT);
371
372           esWorkOnHand.incrementAndGet();
373           supplyAsync(new PerformElasticSearchUpdate(elasticSearchAdapter.getBulkUrl(),
374               requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor)
375                   .whenComplete((result, error) -> {
376
377                     esWorkOnHand.decrementAndGet();
378
379                     if (error != null) {
380                       String message = "Aggregation entity sync UPDATE PUT error - "
381                           + error.getLocalizedMessage();
382                       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
383                     } else {
384                       updateElasticSearchCounters(result);
385                       processStoreDocumentResult(result, esGetTxn, ae);
386                     }
387                   });
388         }
389
390       } else {
391         if (link != null && jsonPayload != null) {
392
393           NetworkTransaction updateElasticTxn = new NetworkTransaction();
394           updateElasticTxn.setLink(link);
395           updateElasticTxn.setEntityType(esGetTxn.getEntityType());
396           updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
397           updateElasticTxn.setOperationType(HttpMethod.PUT);
398
399           esWorkOnHand.incrementAndGet();
400           supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
401               esPutExecutor).whenComplete((result, error) -> {
402
403                 esWorkOnHand.decrementAndGet();
404
405                 if (error != null) {
406                   String message =
407                       "Aggregation entity sync UPDATE PUT error - " + error.getLocalizedMessage();
408                   LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
409                 } else {
410                   updateElasticSearchCounters(result);
411                   processStoreDocumentResult(result, esGetTxn, ae);
412                 }
413               });
414         }
415       }
416     } catch (Exception exc) {
417       String message = "Exception caught during aggregation entity sync PUT operation. Message - "
418           + exc.getLocalizedMessage();
419       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
420     }
421   }
422   
423   /**
424    * Should allow retry.
425    *
426    * @param id the id
427    * @return true, if successful
428    */
429   private boolean shouldAllowRetry(String id) {
430     boolean isRetryAllowed = true;
431     if (retryLimitTracker.get(id) != null) {
432       Integer currentCount = retryLimitTracker.get(id);
433       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
434         isRetryAllowed = false;
435         String message = "Aggregation entity re-sync limit reached for " + id
436             + ", re-sync will no longer be attempted for this entity";
437         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
438       } else {
439         Integer newCount = new Integer(currentCount.intValue() + 1);
440         retryLimitTracker.put(id, newCount);
441       }
442     } else {
443       Integer firstRetryCount = new Integer(1);
444       retryLimitTracker.put(id, firstRetryCount);
445     }
446
447     return isRetryAllowed;
448   }
449   
450   /**
451    * Process store document result.
452    *
453    * @param esPutResult the es put result
454    * @param esGetResult the es get result
455    * @param ae the ae
456    */
457   private void processStoreDocumentResult(NetworkTransaction esPutResult,
458       NetworkTransaction esGetResult, AggregationEntity ae) {
459
460     OperationResult or = esPutResult.getOperationResult();
461
462     if (!or.wasSuccessful()) {
463       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
464
465         if (shouldAllowRetry(ae.getId())) {
466           esWorkOnHand.incrementAndGet();
467
468           RetryAggregationEntitySyncContainer rsc =
469               new RetryAggregationEntitySyncContainer(esGetResult, ae);
470           retryQueue.push(rsc);
471
472           String message = "Store document failed during aggregation entity synchronization"
473               + " due to version conflict. Entity will be re-synced.";
474           LOG.warn(AaiUiMsgs.ERROR_GENERIC, message);
475         }
476       } else {
477         String message =
478             "Store document failed during aggregation entity synchronization with result code "
479                 + or.getResultCode() + " and result message " + or.getResult();
480         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
481       }
482     }
483   }
484   
485   /**
486    * Sync entity types.
487    */
488   private void syncEntityTypes() {
489
490     while (selflinks.peek() != null) {
491
492       SelfLinkDescriptor linkDescriptor = selflinks.poll();
493       aaiWorkOnHand.decrementAndGet();
494
495       OxmEntityDescriptor descriptor = null;
496
497       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
498
499         descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
500
501         if (descriptor == null) {
502           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
503           // go to next element in iterator
504           continue;
505         }
506
507         NetworkTransaction txn = new NetworkTransaction();
508         txn.setDescriptor(descriptor);
509         txn.setLink(linkDescriptor.getSelfLink());
510         txn.setOperationType(HttpMethod.GET);
511         txn.setEntityType(linkDescriptor.getEntityType());
512
513         aaiWorkOnHand.incrementAndGet();
514
515         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
516             .whenComplete((result, error) -> {
517
518               aaiWorkOnHand.decrementAndGet();
519
520               if (error != null) {
521                 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
522               } else {
523                 if (result == null) {
524                   LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
525                       linkDescriptor.getSelfLink());
526                 } else {
527                   updateActiveInventoryCounters(result);
528                   fetchDocumentForUpsert(result);
529                 }
530               }
531             });
532       }
533
534     }
535
536   }
537   
538   /**
539    * Fetch document for upsert.
540    *
541    * @param txn the txn
542    */
543   private void fetchDocumentForUpsert(NetworkTransaction txn) {
544     // modified
545     if (!txn.getOperationResult().wasSuccessful()) {
546       String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
547       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
548       return;
549     }
550
551     try {
552         final String jsonResult = txn.getOperationResult().getResult();
553         if (jsonResult != null && jsonResult.length() > 0) {
554
555           AggregationEntity ae = new AggregationEntity();
556           ae.setLink(ActiveInventoryAdapter.extractResourcePath(txn.getLink()));
557           populateAggregationEntityDocument(ae, jsonResult, txn.getDescriptor());
558           ae.deriveFields();
559
560           String link = null;
561           try {
562             link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId());
563           } catch (Exception exc) {
564             LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
565           }
566
567           if (link != null) {
568             NetworkTransaction n2 = new NetworkTransaction();
569             n2.setLink(link);
570             n2.setEntityType(txn.getEntityType());
571             n2.setDescriptor(txn.getDescriptor());
572             n2.setOperationType(HttpMethod.GET);
573
574             esWorkOnHand.incrementAndGet();
575
576             supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
577                 .whenComplete((result, error) -> {
578
579                   esWorkOnHand.decrementAndGet();
580
581                   if (error != null) {
582                     LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
583                   } else {
584                     updateElasticSearchCounters(result);
585                     performDocumentUpsert(result, ae);
586                   }
587                 });
588           }
589         }
590
591                 } catch (JsonProcessingException exc) {
592                         LOG.error(AaiUiMsgs.ERROR_GENERIC,
593                                         "There was a JSON processing error fetching the elastic document for upsert.  Error: "
594                                                         + exc.getMessage());
595                 } catch (IOException exc) {
596                         LOG.error(AaiUiMsgs.ERROR_GENERIC,
597                                         "There was an IO error fetching the elastic document for upsert.  Error: " + exc.getMessage());
598                 }
599           }
600   
601   
602   /**
603    * Populate aggregation entity document.
604    *
605    * @param doc the doc
606    * @param result the result
607    * @param resultDescriptor the result descriptor
608    * @throws JsonProcessingException the json processing exception
609    * @throws IOException Signals that an I/O exception has occurred.
610    */
611   protected void populateAggregationEntityDocument(AggregationEntity doc, String result,
612       OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
613     doc.setEntityType(resultDescriptor.getEntityName());
614     JsonNode entityNode = mapper.readTree(result);
615     Map<String, Object> map = mapper.convertValue(entityNode, Map.class);
616     doc.copyAttributeKeyValuePair(map);
617   }
618   
619   /**
620    * Process entity type self links.
621    *
622    * @param operationResult the operation result
623    */
624   private void processEntityTypeSelfLinks(OperationResult operationResult) {
625
626     JsonNode rootNode = null;
627     
628     if ( operationResult == null ) {
629         return;
630     }
631
632     final String jsonResult = operationResult.getResult();
633
634     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
635
636       try {
637         rootNode = mapper.readTree(jsonResult);
638       } catch (IOException exc) {
639         String message =
640             "Could not deserialize JSON (representing operation result) as node tree. " +
641             "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
642         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
643       }
644
645       JsonNode resultData = rootNode.get("result-data");
646       ArrayNode resultDataArrayNode = null;
647
648       if (resultData.isArray()) {
649         resultDataArrayNode = (ArrayNode) resultData;
650
651         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
652         JsonNode element = null;
653
654         while (elementIterator.hasNext()) {
655           element = elementIterator.next();
656
657           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
658           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
659
660           OxmEntityDescriptor descriptor = null;
661
662           if (resourceType != null && resourceLink != null) {
663
664             descriptor = oxmEntityLookup.getEntityDescriptors().get(resourceType);
665
666             if (descriptor == null) {
667               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
668               // go to next element in iterator
669               continue;
670             }
671
672             selflinks.add(new SelfLinkDescriptor(resourceLink, SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
673             
674
675           }
676         }
677       }
678     }
679
680   }
681
682   /*
683    * (non-Javadoc)
684    * 
685    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
686    */
687   @Override
688   public OperationState doSync() {
689     this.syncDurationInMs = -1;
690     syncStartedTimeStampInMs = System.currentTimeMillis();
691     String txnID = NodeUtils.getRandomTxnId();
692     MdcContext.initialize(txnID, "AggregationSynchronizer", "", "Sync", "");
693     
694     return collectAllTheWork();
695   }
696
697   @Override
698   public SynchronizerState getState() {
699
700     if (!isSyncDone()) {
701       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
702     }
703
704     return SynchronizerState.IDLE;
705
706   }
707
708   /*
709    * (non-Javadoc)
710    * 
711    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
712    */
713   @Override
714   public String getStatReport(boolean showFinalReport) {
715     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
716     return getStatReport(syncDurationInMs, showFinalReport);
717   }
718
719   public String getEntityType() {
720     return entityType;
721   }
722
723   public void setEntityType(String entityType) {
724     this.entityType = entityType;
725   }
726
727   /*
728    * (non-Javadoc)
729    * 
730    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
731    */
732   @Override
733   public void shutdown() {
734     this.shutdownExecutors();
735   }
736
737   @Override
738   protected boolean isSyncDone() {
739
740     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
741
742     if (LOG.isDebugEnabled()) {
743       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + ", isSyncDone(), totalWorkOnHand = "
744           + totalWorkOnHand + " all work enumerated = " + allWorkEnumerated);
745     }
746
747     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
748       return false;
749     }
750
751     this.syncInProgress = false;
752
753     return true;
754   }
755
756   /*
757    * (non-Javadoc)
758    * 
759    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
760    */
761   @Override
762   public void clearCache() {
763
764     if (syncInProgress) {
765       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
766           "Autosuggestion Entity Summarizer in progress, request to clear cache ignored");
767       return;
768     }
769
770     super.clearCache();
771     this.resetCounters();
772     if (entityCounters != null) {
773       entityCounters.clear();
774     }
775
776     allWorkEnumerated = false;
777
778   }
779
780 }