2 * ============LICENSE_START===================================================
3 * SPARKY (AAI UI service)
4 * ============================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=====================================================
22 * ECOMP and OpenECOMP are trademarks
23 * and service marks of AT&T Intellectual Property.
26 package org.openecomp.sparky.synchronizer;
28 import static java.util.concurrent.CompletableFuture.supplyAsync;
30 import java.io.IOException;
31 import java.net.InetAddress;
32 import java.net.UnknownHostException;
33 import java.sql.Timestamp;
34 import java.text.SimpleDateFormat;
35 import java.util.ArrayList;
36 import java.util.Collection;
37 import java.util.Deque;
38 import java.util.EnumSet;
39 import java.util.HashMap;
40 import java.util.Iterator;
41 import java.util.List;
43 import java.util.Map.Entry;
45 import java.util.concurrent.ConcurrentHashMap;
46 import java.util.concurrent.ConcurrentLinkedDeque;
47 import java.util.concurrent.ExecutorService;
48 import java.util.concurrent.atomic.AtomicInteger;
49 import java.util.function.Supplier;
51 import javax.json.Json;
53 import org.openecomp.cl.api.Logger;
54 import org.openecomp.cl.eelf.LoggerFactory;
55 import org.openecomp.sparky.config.oxm.OxmEntityDescriptor;
56 import org.openecomp.sparky.dal.NetworkTransaction;
57 import org.openecomp.sparky.dal.aai.config.ActiveInventoryConfig;
58 import org.openecomp.sparky.dal.elasticsearch.config.ElasticSearchConfig;
59 import org.openecomp.sparky.dal.rest.HttpMethod;
60 import org.openecomp.sparky.dal.rest.OperationResult;
61 import org.openecomp.sparky.logging.AaiUiMsgs;
62 import org.openecomp.sparky.synchronizer.config.SynchronizerConfiguration;
63 import org.openecomp.sparky.synchronizer.entity.AggregationEntity;
64 import org.openecomp.sparky.synchronizer.entity.MergableEntity;
65 import org.openecomp.sparky.synchronizer.entity.SelfLinkDescriptor;
66 import org.openecomp.sparky.synchronizer.enumeration.OperationState;
67 import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
68 import org.openecomp.sparky.synchronizer.task.PerformActiveInventoryRetrieval;
69 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchPut;
70 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchRetrieval;
71 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchUpdate;
72 import org.openecomp.sparky.util.NodeUtils;
75 import org.openecomp.cl.mdc.MdcContext;
76 import com.fasterxml.jackson.core.JsonProcessingException;
77 import com.fasterxml.jackson.databind.JsonNode;
78 import com.fasterxml.jackson.databind.ObjectReader;
79 import com.fasterxml.jackson.databind.node.ArrayNode;
82 * The Class AutosuggestionSynchronizer.
84 public class AggregationSynchronizer extends AbstractEntitySynchronizer
85 implements IndexSynchronizer {
88 * The Class RetryAggregationEntitySyncContainer.
90 private class RetryAggregationEntitySyncContainer {
91 NetworkTransaction txn;
95 * Instantiates a new retry aggregation entity sync container.
100 public RetryAggregationEntitySyncContainer(NetworkTransaction txn, AggregationEntity ae) {
105 public NetworkTransaction getNetworkTransaction() {
109 public AggregationEntity getAggregationEntity() {
114 private static final Logger LOG =
115 LoggerFactory.getInstance().getLogger(AggregationSynchronizer.class);
116 private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
118 private boolean allWorkEnumerated;
119 private Deque<SelfLinkDescriptor> selflinks;
120 private Deque<RetryAggregationEntitySyncContainer> retryQueue;
121 private Map<String, Integer> retryLimitTracker;
122 protected ExecutorService esPutExecutor;
123 private ConcurrentHashMap<String, AtomicInteger> entityCounters;
124 private boolean syncInProgress;
125 private Map<String, String> contextMap;
126 private String entityType;
129 * Instantiates a new entity aggregation synchronizer.
131 * @param indexName the index name
132 * @throws Exception the exception
134 public AggregationSynchronizer(String entityType, String indexName) throws Exception {
135 super(LOG, "AGGES-" + indexName.toUpperCase(), 2, 5, 5, indexName); // multiple Autosuggestion
136 // Entity Synchronizer will
137 // run for different indices
139 this.entityType = entityType;
140 this.allWorkEnumerated = false;
141 this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
142 this.synchronizerName = "Entity Aggregation Synchronizer";
143 this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
144 this.syncInProgress = false;
145 this.allWorkEnumerated = false;
146 this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
147 this.retryQueue = new ConcurrentLinkedDeque<RetryAggregationEntitySyncContainer>();
148 this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
150 this.esPutExecutor = NodeUtils.createNamedExecutor("AGGES-ES-PUT", 1, LOG);
151 Map<String, OxmEntityDescriptor> descriptor = new HashMap<String, OxmEntityDescriptor>();
152 descriptor.put(entityType, oxmModelLoader.getEntityDescriptors().get(entityType));
153 this.aaiEntityStats.initializeCountersFromOxmEntityDescriptors(
155 this.esEntityStats.initializeCountersFromOxmEntityDescriptors(
157 this.contextMap = MDC.getCopyOfContextMap();
161 * Collect all the work.
163 * @return the operation state
165 private OperationState collectAllTheWork() {
166 final Map<String, String> contextMap = MDC.getCopyOfContextMap();
167 final String entity = this.getEntityType();
170 aaiWorkOnHand.set(1);
172 supplyAsync(new Supplier<Void>() {
176 MDC.setContextMap(contextMap);
177 OperationResult typeLinksResult = null;
179 typeLinksResult = aaiDataProvider.getSelfLinksByEntityType(entity);
180 aaiWorkOnHand.decrementAndGet();
181 processEntityTypeSelfLinks(typeLinksResult);
182 } catch (Exception exc) {
183 // TODO -> LOG, what should be logged here?
189 }, aaiExecutor).whenComplete((result, error) -> {
192 LOG.error(AaiUiMsgs.ERROR_GENERIC,
193 "An error occurred getting data from AAI. Error = " + error.getMessage());
197 while (aaiWorkOnHand.get() != 0) {
199 if (LOG.isDebugEnabled()) {
200 LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
206 aaiWorkOnHand.set(selflinks.size());
207 allWorkEnumerated = true;
210 while (!isSyncDone()) {
216 * Make sure we don't hang on to retries that failed which could cause issues during future
219 retryLimitTracker.clear();
221 } catch (Exception exc) {
222 // TODO -> LOG, waht should be logged here?
225 return OperationState.OK;
230 * Perform retry sync.
232 private void performRetrySync() {
233 while (retryQueue.peek() != null) {
235 RetryAggregationEntitySyncContainer rsc = retryQueue.poll();
238 AggregationEntity ae = rsc.getAggregationEntity();
239 NetworkTransaction txn = rsc.getNetworkTransaction();
244 * In this retry flow the se object has already derived its fields
246 link = getElasticFullUrl("/" + ae.getId(), getIndexName());
247 } catch (Exception exc) {
248 LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
252 NetworkTransaction retryTransaction = new NetworkTransaction();
253 retryTransaction.setLink(link);
254 retryTransaction.setEntityType(txn.getEntityType());
255 retryTransaction.setDescriptor(txn.getDescriptor());
256 retryTransaction.setOperationType(HttpMethod.GET);
259 * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
260 * called incrementAndGet when queuing the failed PUT!
263 supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, esDataProvider),
264 esExecutor).whenComplete((result, error) -> {
266 esWorkOnHand.decrementAndGet();
269 LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
271 updateElasticSearchCounters(result);
272 performDocumentUpsert(result, ae);
282 * Perform document upsert.
284 * @param esGetTxn the es get txn
287 protected void performDocumentUpsert(NetworkTransaction esGetTxn, AggregationEntity ae) {
291 * As part of the response processing we need to do the following:
292 * <li>1. Extract the version (if present), it will be the ETAG when we use the
293 * Search-Abstraction-Service
294 * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
296 * <li>a) if version is null or RC=404, then standard put, no _update with version tag
297 * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
303 link = getElasticFullUrl("/" + ae.getId(), getIndexName());
304 } catch (Exception exc) {
305 LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
309 String versionNumber = null;
310 boolean wasEntryDiscovered = false;
311 if (esGetTxn.getOperationResult().getResultCode() == 404) {
312 LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, ae.getEntityPrimaryKeyValue());
313 } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
314 wasEntryDiscovered = true;
316 versionNumber = NodeUtils.extractFieldValueFromObject(
317 NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
319 } catch (IOException exc) {
321 "Error extracting version number from response, aborting aggregation entity sync of "
322 + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
323 LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
328 * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
331 LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
332 String.valueOf(esGetTxn.getOperationResult().getResultCode()));
337 String jsonPayload = null;
338 if (wasEntryDiscovered) {
340 ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
341 NodeUtils.extractObjectsByKey(
342 NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
343 "_source", sourceObject);
345 if (!sourceObject.isEmpty()) {
346 String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
347 MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
348 ObjectReader updater = mapper.readerForUpdating(me);
349 MergableEntity merged = updater.readValue(ae.getIndexDocumentJson());
350 jsonPayload = mapper.writeValueAsString(merged);
352 } catch (IOException exc) {
354 "Error extracting source value from response, aborting aggregation entity sync of "
355 + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
356 LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
360 jsonPayload = ae.getIndexDocumentJson();
363 if (wasEntryDiscovered) {
364 if (versionNumber != null && jsonPayload != null) {
366 String requestPayload = esDataProvider.buildBulkImportOperationRequest(getIndexName(),
367 ElasticSearchConfig.getConfig().getType(), ae.getId(), versionNumber, jsonPayload);
369 NetworkTransaction transactionTracker = new NetworkTransaction();
370 transactionTracker.setEntityType(esGetTxn.getEntityType());
371 transactionTracker.setDescriptor(esGetTxn.getDescriptor());
372 transactionTracker.setOperationType(HttpMethod.PUT);
374 esWorkOnHand.incrementAndGet();
375 supplyAsync(new PerformElasticSearchUpdate(ElasticSearchConfig.getConfig().getBulkUrl(),
376 requestPayload, esDataProvider, transactionTracker), esPutExecutor)
377 .whenComplete((result, error) -> {
379 esWorkOnHand.decrementAndGet();
382 String message = "Aggregation entity sync UPDATE PUT error - "
383 + error.getLocalizedMessage();
384 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
386 updateElasticSearchCounters(result);
387 processStoreDocumentResult(result, esGetTxn, ae);
393 if (link != null && jsonPayload != null) {
395 NetworkTransaction updateElasticTxn = new NetworkTransaction();
396 updateElasticTxn.setLink(link);
397 updateElasticTxn.setEntityType(esGetTxn.getEntityType());
398 updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
399 updateElasticTxn.setOperationType(HttpMethod.PUT);
401 esWorkOnHand.incrementAndGet();
402 supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, esDataProvider),
403 esPutExecutor).whenComplete((result, error) -> {
405 esWorkOnHand.decrementAndGet();
409 "Aggregation entity sync UPDATE PUT error - " + error.getLocalizedMessage();
410 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
412 updateElasticSearchCounters(result);
413 processStoreDocumentResult(result, esGetTxn, ae);
418 } catch (Exception exc) {
419 String message = "Exception caught during aggregation entity sync PUT operation. Message - "
420 + exc.getLocalizedMessage();
421 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
426 * Should allow retry.
429 * @return true, if successful
431 private boolean shouldAllowRetry(String id) {
432 boolean isRetryAllowed = true;
433 if (retryLimitTracker.get(id) != null) {
434 Integer currentCount = retryLimitTracker.get(id);
435 if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
436 isRetryAllowed = false;
437 String message = "Aggregation entity re-sync limit reached for " + id
438 + ", re-sync will no longer be attempted for this entity";
439 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
441 Integer newCount = new Integer(currentCount.intValue() + 1);
442 retryLimitTracker.put(id, newCount);
445 Integer firstRetryCount = new Integer(1);
446 retryLimitTracker.put(id, firstRetryCount);
449 return isRetryAllowed;
453 * Process store document result.
455 * @param esPutResult the es put result
456 * @param esGetResult the es get result
459 private void processStoreDocumentResult(NetworkTransaction esPutResult,
460 NetworkTransaction esGetResult, AggregationEntity ae) {
462 OperationResult or = esPutResult.getOperationResult();
464 if (!or.wasSuccessful()) {
465 if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
467 if (shouldAllowRetry(ae.getId())) {
468 esWorkOnHand.incrementAndGet();
470 RetryAggregationEntitySyncContainer rsc =
471 new RetryAggregationEntitySyncContainer(esGetResult, ae);
472 retryQueue.push(rsc);
474 String message = "Store document failed during aggregation entity synchronization"
475 + " due to version conflict. Entity will be re-synced.";
476 LOG.warn(AaiUiMsgs.ERROR_GENERIC, message);
480 "Store document failed during aggregation entity synchronization with result code "
481 + or.getResultCode() + " and result message " + or.getResult();
482 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
490 private void syncEntityTypes() {
492 while (selflinks.peek() != null) {
494 SelfLinkDescriptor linkDescriptor = selflinks.poll();
495 aaiWorkOnHand.decrementAndGet();
497 OxmEntityDescriptor descriptor = null;
499 if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
501 descriptor = oxmModelLoader.getEntityDescriptor(linkDescriptor.getEntityType());
503 if (descriptor == null) {
504 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
505 // go to next element in iterator
509 NetworkTransaction txn = new NetworkTransaction();
510 txn.setDescriptor(descriptor);
511 txn.setLink(linkDescriptor.getSelfLink());
512 txn.setOperationType(HttpMethod.GET);
513 txn.setEntityType(linkDescriptor.getEntityType());
515 aaiWorkOnHand.incrementAndGet();
517 supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiDataProvider), aaiExecutor)
518 .whenComplete((result, error) -> {
520 aaiWorkOnHand.decrementAndGet();
523 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
525 if (result == null) {
526 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
527 linkDescriptor.getSelfLink());
529 updateActiveInventoryCounters(result);
530 fetchDocumentForUpsert(result);
541 * Fetch document for upsert.
545 private void fetchDocumentForUpsert(NetworkTransaction txn) {
547 if (!txn.getOperationResult().wasSuccessful()) {
548 String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
549 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
554 final String jsonResult = txn.getOperationResult().getResult();
555 if (jsonResult != null && jsonResult.length() > 0) {
557 AggregationEntity ae = new AggregationEntity(oxmModelLoader);
558 ae.setLink(ActiveInventoryConfig.extractResourcePath(txn.getLink()));
559 populateAggregationEntityDocument(ae, jsonResult, txn.getDescriptor());
564 link = getElasticFullUrl("/" + ae.getId(), getIndexName());
565 } catch (Exception exc) {
566 LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
570 NetworkTransaction n2 = new NetworkTransaction();
572 n2.setEntityType(txn.getEntityType());
573 n2.setDescriptor(txn.getDescriptor());
574 n2.setOperationType(HttpMethod.GET);
576 esWorkOnHand.incrementAndGet();
578 supplyAsync(new PerformElasticSearchRetrieval(n2, esDataProvider), esExecutor)
579 .whenComplete((result, error) -> {
581 esWorkOnHand.decrementAndGet();
584 LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
586 updateElasticSearchCounters(result);
587 performDocumentUpsert(result, ae);
593 } catch (JsonProcessingException exc) {
594 // TODO -> LOG, waht should be logged here?
595 } catch (IOException exc) {
596 // TODO -> LOG, waht should be logged here?
602 * Populate aggregation entity document.
605 * @param result the result
606 * @param resultDescriptor the result descriptor
607 * @throws JsonProcessingException the json processing exception
608 * @throws IOException Signals that an I/O exception has occurred.
610 protected void populateAggregationEntityDocument(AggregationEntity doc, String result,
611 OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
612 doc.setEntityType(resultDescriptor.getEntityName());
613 JsonNode entityNode = mapper.readTree(result);
614 Map<String, Object> map = mapper.convertValue(entityNode, Map.class);
615 doc.copyAttributeKeyValuePair(map);
619 * Process entity type self links.
621 * @param operationResult the operation result
623 private void processEntityTypeSelfLinks(OperationResult operationResult) {
625 JsonNode rootNode = null;
627 final String jsonResult = operationResult.getResult();
629 if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
632 rootNode = mapper.readTree(jsonResult);
633 } catch (IOException exc) {
635 "Could not deserialize JSON (representing operation result) as node tree. " +
636 "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
637 LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
640 JsonNode resultData = rootNode.get("result-data");
641 ArrayNode resultDataArrayNode = null;
643 if (resultData.isArray()) {
644 resultDataArrayNode = (ArrayNode) resultData;
646 Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
647 JsonNode element = null;
649 while (elementIterator.hasNext()) {
650 element = elementIterator.next();
652 final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
653 final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
655 OxmEntityDescriptor descriptor = null;
657 if (resourceType != null && resourceLink != null) {
659 descriptor = oxmModelLoader.getEntityDescriptor(resourceType);
661 if (descriptor == null) {
662 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
663 // go to next element in iterator
667 selflinks.add(new SelfLinkDescriptor(resourceLink, SynchronizerConfiguration.NODES_ONLY_MODIFIER, resourceType));
680 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
683 public OperationState doSync() {
684 this.syncDurationInMs = -1;
685 syncStartedTimeStampInMs = System.currentTimeMillis();
686 String txnID = NodeUtils.getRandomTxnId();
687 MdcContext.initialize(txnID, "AggregationSynchronizer", "", "Sync", "");
689 return collectAllTheWork();
693 public SynchronizerState getState() {
696 return SynchronizerState.PERFORMING_SYNCHRONIZATION;
699 return SynchronizerState.IDLE;
706 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
709 public String getStatReport(boolean showFinalReport) {
710 syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
711 return getStatReport(syncDurationInMs, showFinalReport);
714 public String getEntityType() {
718 public void setEntityType(String entityType) {
719 this.entityType = entityType;
725 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
728 public void shutdown() {
729 this.shutdownExecutors();
733 protected boolean isSyncDone() {
735 int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
737 if (LOG.isDebugEnabled()) {
738 LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + ", isSyncDone(), totalWorkOnHand = "
739 + totalWorkOnHand + " all work enumerated = " + allWorkEnumerated);
742 if (totalWorkOnHand > 0 || !allWorkEnumerated) {
746 this.syncInProgress = false;
754 * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
757 public void clearCache() {
759 if (syncInProgress) {
760 LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
761 "Autosuggestion Entity Summarizer in progress, request to clear cache ignored");
766 this.resetCounters();
767 if (entityCounters != null) {
768 entityCounters.clear();
771 allWorkEnumerated = false;