2 * ============LICENSE_START===================================================
3 * SPARKY (AAI UI service)
4 * ============================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=====================================================
22 * ECOMP and OpenECOMP are trademarks
23 * and service marks of AT&T Intellectual Property.
26 package org.openecomp.sparky.synchronizer;
28 import static java.util.concurrent.CompletableFuture.supplyAsync;
30 import org.openecomp.cl.mdc.MdcContext;
32 import org.openecomp.cl.mdc.MdcContext;
34 import com.fasterxml.jackson.core.JsonProcessingException;
35 import com.fasterxml.jackson.databind.JsonNode;
36 import com.fasterxml.jackson.databind.ObjectReader;
37 import com.fasterxml.jackson.databind.node.ArrayNode;
39 import java.io.IOException;
40 import java.net.InetAddress;
41 import java.net.UnknownHostException;
42 import java.util.ArrayList;
43 import java.util.Collection;
44 import java.util.Deque;
45 import java.util.Iterator;
46 import java.util.List;
48 import java.util.concurrent.ConcurrentHashMap;
49 import java.util.concurrent.ConcurrentLinkedDeque;
50 import java.util.concurrent.ExecutorService;
51 import java.util.function.Supplier;
53 import org.openecomp.cl.api.Logger;
54 import org.openecomp.cl.eelf.LoggerFactory;
55 import org.openecomp.sparky.config.oxm.OxmEntityDescriptor;
56 import org.openecomp.sparky.dal.NetworkTransaction;
57 import org.openecomp.sparky.dal.elasticsearch.config.ElasticSearchConfig;
58 import org.openecomp.sparky.dal.rest.HttpMethod;
59 import org.openecomp.sparky.dal.rest.OperationResult;
60 import org.openecomp.sparky.logging.AaiUiMsgs;
61 import org.openecomp.sparky.synchronizer.config.SynchronizerConfiguration;
62 import org.openecomp.sparky.synchronizer.entity.MergableEntity;
63 import org.openecomp.sparky.synchronizer.entity.SearchableEntity;
64 import org.openecomp.sparky.synchronizer.entity.SelfLinkDescriptor;
65 import org.openecomp.sparky.synchronizer.enumeration.OperationState;
66 import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
67 import org.openecomp.sparky.synchronizer.task.PerformActiveInventoryRetrieval;
68 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchPut;
69 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchRetrieval;
70 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchUpdate;
71 import org.openecomp.sparky.util.NodeUtils;
75 * The Class SearchableEntitySynchronizer.
77 public class SearchableEntitySynchronizer extends AbstractEntitySynchronizer
78 implements IndexSynchronizer {
81 * The Class RetrySearchableEntitySyncContainer.
83 private class RetrySearchableEntitySyncContainer {
84 NetworkTransaction txn;
88 * Instantiates a new retry searchable entity sync container.
93 public RetrySearchableEntitySyncContainer(NetworkTransaction txn, SearchableEntity se) {
98 public NetworkTransaction getNetworkTransaction() {
102 public SearchableEntity getSearchableEntity() {
107 private static final Logger LOG =
108 LoggerFactory.getInstance().getLogger(SearchableEntitySynchronizer.class);
110 private boolean allWorkEnumerated;
111 private Deque<SelfLinkDescriptor> selflinks;
112 private Deque<RetrySearchableEntitySyncContainer> retryQueue;
113 private Map<String, Integer> retryLimitTracker;
114 protected ExecutorService esPutExecutor;
117 * Instantiates a new searchable entity synchronizer.
119 * @param indexName the index name
120 * @throws Exception the exception
122 public SearchableEntitySynchronizer(String indexName) throws Exception {
123 super(LOG, "SES", 2, 5, 5, indexName);
124 this.allWorkEnumerated = false;
125 this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
126 this.retryQueue = new ConcurrentLinkedDeque<RetrySearchableEntitySyncContainer>();
127 this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
128 this.synchronizerName = "Searchable Entity Synchronizer";
129 this.esPutExecutor = NodeUtils.createNamedExecutor("SES-ES-PUT", 5, LOG);
130 this.aaiEntityStats.initializeCountersFromOxmEntityDescriptors(
131 oxmModelLoader.getSearchableEntityDescriptors());
132 this.esEntityStats.initializeCountersFromOxmEntityDescriptors(
133 oxmModelLoader.getSearchableEntityDescriptors());
137 * Collect all the work.
139 * @return the operation state
141 private OperationState collectAllTheWork() {
142 final Map<String, String> contextMap = MDC.getCopyOfContextMap();
143 Map<String, OxmEntityDescriptor> descriptorMap =
144 oxmModelLoader.getSearchableEntityDescriptors();
146 if (descriptorMap.isEmpty()) {
147 LOG.error(AaiUiMsgs.ERROR_LOADING_OXM_SEARCHABLE_ENTITIES);
148 LOG.info(AaiUiMsgs.ERROR_LOADING_OXM_SEARCHABLE_ENTITIES);
149 return OperationState.ERROR;
152 Collection<String> syncTypes = descriptorMap.keySet();
154 /*Collection<String> syncTypes = new ArrayList<String>();
155 syncTypes.add("service-instance");*/
160 * launch a parallel async thread to process the documents for each entity-type (to max the
161 * of the configured executor anyway)
164 aaiWorkOnHand.set(syncTypes.size());
166 for (String key : syncTypes) {
168 supplyAsync(new Supplier<Void>() {
172 MDC.setContextMap(contextMap);
173 OperationResult typeLinksResult = null;
175 typeLinksResult = aaiDataProvider.getSelfLinksByEntityType(key);
176 aaiWorkOnHand.decrementAndGet();
177 processEntityTypeSelfLinks(typeLinksResult);
178 } catch (Exception exc) {
179 // TODO -> LOG, what should be logged here?
185 }, aaiExecutor).whenComplete((result, error) -> {
188 LOG.error(AaiUiMsgs.ERROR_GENERIC,
189 "An error occurred getting data from AAI. Error = " + error.getMessage());
195 while (aaiWorkOnHand.get() != 0) {
197 if (LOG.isDebugEnabled()) {
198 LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
204 aaiWorkOnHand.set(selflinks.size());
205 allWorkEnumerated = true;
208 while (!isSyncDone()) {
214 * Make sure we don't hang on to retries that failed which could cause issues during future
217 retryLimitTracker.clear();
219 } catch (Exception exc) {
220 // TODO -> LOG, waht should be logged here?
223 return OperationState.OK;
227 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
230 public OperationState doSync() {
231 String txnID = NodeUtils.getRandomTxnId();
232 MdcContext.initialize(txnID, "SearchableEntitySynchronizer", "", "Sync", "");
235 this.allWorkEnumerated = false;
236 syncStartedTimeStampInMs = System.currentTimeMillis();
239 return OperationState.OK;
243 * Process entity type self links.
245 * @param operationResult the operation result
247 private void processEntityTypeSelfLinks(OperationResult operationResult) {
249 JsonNode rootNode = null;
251 final String jsonResult = operationResult.getResult();
253 if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
256 rootNode = mapper.readTree(jsonResult);
257 } catch (IOException exc) {
259 "Could not deserialize JSON (representing operation result) as node tree. " +
260 "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
261 LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
264 JsonNode resultData = rootNode.get("result-data");
265 ArrayNode resultDataArrayNode = null;
267 if (resultData.isArray()) {
268 resultDataArrayNode = (ArrayNode) resultData;
270 Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
271 JsonNode element = null;
273 while (elementIterator.hasNext()) {
274 element = elementIterator.next();
276 final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
277 final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
279 OxmEntityDescriptor descriptor = null;
281 if (resourceType != null && resourceLink != null) {
283 descriptor = oxmModelLoader.getEntityDescriptor(resourceType);
285 if (descriptor == null) {
286 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
287 // go to next element in iterator
291 if (descriptor.hasSearchableAttributes()) {
292 selflinks.add(new SelfLinkDescriptor(resourceLink, SynchronizerConfiguration.NODES_ONLY_MODIFIER, resourceType));
305 private void syncEntityTypes() {
307 while (selflinks.peek() != null) {
309 SelfLinkDescriptor linkDescriptor = selflinks.poll();
310 aaiWorkOnHand.decrementAndGet();
312 OxmEntityDescriptor descriptor = null;
314 if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
316 descriptor = oxmModelLoader.getEntityDescriptor(linkDescriptor.getEntityType());
318 if (descriptor == null) {
319 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
320 // go to next element in iterator
324 NetworkTransaction txn = new NetworkTransaction();
325 txn.setDescriptor(descriptor);
326 txn.setLink(linkDescriptor.getSelfLink());
327 txn.setOperationType(HttpMethod.GET);
328 txn.setEntityType(linkDescriptor.getEntityType());
330 aaiWorkOnHand.incrementAndGet();
332 supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiDataProvider), aaiExecutor)
333 .whenComplete((result, error) -> {
335 aaiWorkOnHand.decrementAndGet();
338 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
340 if (result == null) {
341 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
342 linkDescriptor.getSelfLink());
344 updateActiveInventoryCounters(result);
345 fetchDocumentForUpsert(result);
356 * Perform document upsert.
358 * @param esGetTxn the es get txn
361 protected void performDocumentUpsert(NetworkTransaction esGetTxn, SearchableEntity se) {
365 * As part of the response processing we need to do the following:
366 * <li>1. Extract the version (if present), it will be the ETAG when we use the
367 * Search-Abstraction-Service
368 * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
370 * <li>a) if version is null or RC=404, then standard put, no _update with version tag
371 * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
377 link = getElasticFullUrl("/" + se.getId(), getIndexName());
378 } catch (Exception exc) {
379 LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
383 String versionNumber = null;
384 boolean wasEntryDiscovered = false;
385 if (esGetTxn.getOperationResult().getResultCode() == 404) {
386 LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, se.getEntityPrimaryKeyValue());
387 } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
388 wasEntryDiscovered = true;
390 versionNumber = NodeUtils.extractFieldValueFromObject(
391 NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
393 } catch (IOException exc) {
395 "Error extracting version number from response, aborting searchable entity sync of "
396 + se.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
397 LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
402 * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
405 LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
406 String.valueOf(esGetTxn.getOperationResult().getResultCode()));
411 String jsonPayload = null;
412 if (wasEntryDiscovered) {
414 ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
415 NodeUtils.extractObjectsByKey(
416 NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
417 "_source", sourceObject);
419 if (!sourceObject.isEmpty()) {
420 String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
421 MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
422 ObjectReader updater = mapper.readerForUpdating(me);
423 MergableEntity merged = updater.readValue(se.getIndexDocumentJson());
424 jsonPayload = mapper.writeValueAsString(merged);
426 } catch (IOException exc) {
428 "Error extracting source value from response, aborting searchable entity sync of "
429 + se.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
430 LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
434 jsonPayload = se.getIndexDocumentJson();
437 if (wasEntryDiscovered) {
438 if (versionNumber != null && jsonPayload != null) {
440 String requestPayload = esDataProvider.buildBulkImportOperationRequest(getIndexName(),
441 ElasticSearchConfig.getConfig().getType(), se.getId(), versionNumber, jsonPayload);
443 NetworkTransaction transactionTracker = new NetworkTransaction();
444 transactionTracker.setEntityType(esGetTxn.getEntityType());
445 transactionTracker.setDescriptor(esGetTxn.getDescriptor());
446 transactionTracker.setOperationType(HttpMethod.PUT);
448 esWorkOnHand.incrementAndGet();
449 supplyAsync(new PerformElasticSearchUpdate(ElasticSearchConfig.getConfig().getBulkUrl(),
450 requestPayload, esDataProvider, transactionTracker), esPutExecutor)
451 .whenComplete((result, error) -> {
453 esWorkOnHand.decrementAndGet();
456 String message = "Searchable entity sync UPDATE PUT error - "
457 + error.getLocalizedMessage();
458 LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
460 updateElasticSearchCounters(result);
461 processStoreDocumentResult(result, esGetTxn, se);
467 if (link != null && jsonPayload != null) {
469 NetworkTransaction updateElasticTxn = new NetworkTransaction();
470 updateElasticTxn.setLink(link);
471 updateElasticTxn.setEntityType(esGetTxn.getEntityType());
472 updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
473 updateElasticTxn.setOperationType(HttpMethod.PUT);
475 esWorkOnHand.incrementAndGet();
476 supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, esDataProvider),
477 esPutExecutor).whenComplete((result, error) -> {
479 esWorkOnHand.decrementAndGet();
483 "Searchable entity sync UPDATE PUT error - " + error.getLocalizedMessage();
484 LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
486 updateElasticSearchCounters(result);
487 processStoreDocumentResult(result, esGetTxn, se);
492 } catch (Exception exc) {
493 String message = "Exception caught during searchable entity sync PUT operation. Message - "
494 + exc.getLocalizedMessage();
495 LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
500 * Populate searchable entity document.
503 * @param result the result
504 * @param resultDescriptor the result descriptor
505 * @throws JsonProcessingException the json processing exception
506 * @throws IOException Signals that an I/O exception has occurred.
508 protected void populateSearchableEntityDocument(SearchableEntity doc, String result,
509 OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
511 doc.setEntityType(resultDescriptor.getEntityName());
513 JsonNode entityNode = mapper.readTree(result);
515 List<String> primaryKeyValues = new ArrayList<String>();
516 String pkeyValue = null;
518 for (String keyName : resultDescriptor.getPrimaryKeyAttributeName()) {
519 pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
520 if (pkeyValue != null) {
521 primaryKeyValues.add(pkeyValue);
523 String message = "populateSearchableEntityDocument(), pKeyValue is null for entityType = "
524 + resultDescriptor.getEntityName();
525 LOG.warn(AaiUiMsgs.WARN_GENERIC, message);
529 final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
530 doc.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
532 final List<String> searchTagFields = resultDescriptor.getSearchableAttributes();
535 * Based on configuration, use the configured field names for this entity-Type to build a
536 * multi-value collection of search tags for elastic search entity search criteria.
538 for (String searchTagField : searchTagFields) {
539 String searchTagValue = NodeUtils.getNodeFieldAsText(entityNode, searchTagField);
540 if (searchTagValue != null && !searchTagValue.isEmpty()) {
541 doc.addSearchTagWithKey(searchTagValue, searchTagField);
547 * Fetch document for upsert.
551 private void fetchDocumentForUpsert(NetworkTransaction txn) {
552 if (!txn.getOperationResult().wasSuccessful()) {
553 String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
554 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
559 if (txn.getDescriptor().hasSearchableAttributes()) {
561 final String jsonResult = txn.getOperationResult().getResult();
562 if (jsonResult != null && jsonResult.length() > 0) {
564 SearchableEntity se = new SearchableEntity(oxmModelLoader);
565 se.setLink( txn.getLink() );
566 populateSearchableEntityDocument(se, jsonResult, txn.getDescriptor());
571 link = getElasticFullUrl("/" + se.getId(), getIndexName());
572 } catch (Exception exc) {
573 LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
577 NetworkTransaction n2 = new NetworkTransaction();
579 n2.setEntityType(txn.getEntityType());
580 n2.setDescriptor(txn.getDescriptor());
581 n2.setOperationType(HttpMethod.GET);
583 esWorkOnHand.incrementAndGet();
585 supplyAsync(new PerformElasticSearchRetrieval(n2, esDataProvider), esExecutor)
586 .whenComplete((result, error) -> {
588 esWorkOnHand.decrementAndGet();
591 LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
593 updateElasticSearchCounters(result);
594 performDocumentUpsert(result, se);
601 } catch (JsonProcessingException exc) {
602 // TODO -> LOG, waht should be logged here?
603 } catch (IOException exc) {
604 // TODO -> LOG, waht should be logged here?
609 * Process store document result.
611 * @param esPutResult the es put result
612 * @param esGetResult the es get result
615 private void processStoreDocumentResult(NetworkTransaction esPutResult,
616 NetworkTransaction esGetResult, SearchableEntity se) {
618 OperationResult or = esPutResult.getOperationResult();
620 if (!or.wasSuccessful()) {
621 if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
623 if (shouldAllowRetry(se.getId())) {
624 esWorkOnHand.incrementAndGet();
626 RetrySearchableEntitySyncContainer rsc =
627 new RetrySearchableEntitySyncContainer(esGetResult, se);
628 retryQueue.push(rsc);
630 String message = "Store document failed during searchable entity synchronization"
631 + " due to version conflict. Entity will be re-synced.";
632 LOG.warn(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
636 "Store document failed during searchable entity synchronization with result code "
637 + or.getResultCode() + " and result message " + or.getResult();
638 LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
644 * Perform retry sync.
646 private void performRetrySync() {
647 while (retryQueue.peek() != null) {
649 RetrySearchableEntitySyncContainer rsc = retryQueue.poll();
652 SearchableEntity se = rsc.getSearchableEntity();
653 NetworkTransaction txn = rsc.getNetworkTransaction();
658 * In this retry flow the se object has already derived its fields
660 link = getElasticFullUrl("/" + se.getId(), getIndexName());
661 } catch (Exception exc) {
662 LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
666 NetworkTransaction retryTransaction = new NetworkTransaction();
667 retryTransaction.setLink(link);
668 retryTransaction.setEntityType(txn.getEntityType());
669 retryTransaction.setDescriptor(txn.getDescriptor());
670 retryTransaction.setOperationType(HttpMethod.GET);
673 * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
674 * called incrementAndGet when queuing the failed PUT!
677 supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, esDataProvider),
678 esExecutor).whenComplete((result, error) -> {
680 esWorkOnHand.decrementAndGet();
683 LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
685 updateElasticSearchCounters(result);
686 performDocumentUpsert(result, se);
696 * Should allow retry.
699 * @return true, if successful
701 private boolean shouldAllowRetry(String id) {
702 boolean isRetryAllowed = true;
703 if (retryLimitTracker.get(id) != null) {
704 Integer currentCount = retryLimitTracker.get(id);
705 if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
706 isRetryAllowed = false;
707 String message = "Searchable entity re-sync limit reached for " + id
708 + ", re-sync will no longer be attempted for this entity";
709 LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
711 Integer newCount = new Integer(currentCount.intValue() + 1);
712 retryLimitTracker.put(id, newCount);
715 Integer firstRetryCount = new Integer(1);
716 retryLimitTracker.put(id, firstRetryCount);
719 return isRetryAllowed;
723 public SynchronizerState getState() {
725 return SynchronizerState.PERFORMING_SYNCHRONIZATION;
728 return SynchronizerState.IDLE;
733 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
736 public String getStatReport(boolean showFinalReport) {
737 return this.getStatReport(System.currentTimeMillis() - syncStartedTimeStampInMs,
742 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
745 public void shutdown() {
746 this.shutdownExecutors();
750 protected boolean isSyncDone() {
751 int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
753 if (totalWorkOnHand > 0 || !allWorkEnumerated) {