/**
- * ============LICENSE_START===================================================
- * SPARKY (AAI UI service)
- * ============================================================================
- * Copyright © 2017 AT&T Intellectual Property.
- * Copyright © 2017 Amdocs
- * All rights reserved.
- * ============================================================================
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=====================================================
- *
- * ECOMP and OpenECOMP are trademarks
- * and service marks of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
*/
package org.onap.aai.sparky.autosuggestion.sync;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.sparky.sync.enumeration.OperationState;
import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
+import org.onap.aai.sparky.sync.task.PerformSearchServicePut;
+import org.onap.aai.sparky.sync.task.PerformSearchServiceRetrieval;
import org.onap.aai.sparky.util.NodeUtils;
import org.onap.aai.sparky.util.SuggestionsPermutation;
import org.slf4j.MDC;
/**
* Instantiates a new historical entity summarizer.
*
- * @param indexName the index name
* @throws Exception the exception
*/
public AutosuggestionSynchronizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
suggestionEntityLookup.getSuggestionSearchEntityDescriptors();
if (descriptorMap.isEmpty()) {
+ this.allWorkEnumerated = true;
LOG.error(AaiUiMsgs.ERROR_LOADING_OXM_SUGGESTIBLE_ENTITIES);
LOG.info(AaiUiMsgs.ERROR_LOADING_OXM_SUGGESTIBLE_ENTITIES);
return OperationState.ERROR;
*/
private void processEntityTypeSelfLinks(OperationResult operationResult) {
- JsonNode rootNode = null;
-
- if ( operationResult == null ) {
- return;
+ if (operationResult == null) {
+ return;
}
final String jsonResult = operationResult.getResult();
if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
try {
- rootNode = mapper.readTree(jsonResult);
- } catch (IOException exc) {
- String message = "Could not deserialize JSON (representing operation result) as node tree. "
- + "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
- LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
- }
+ JsonNode rootNode = mapper.readTree(jsonResult);
+ JsonNode resultData = rootNode.get("result-data");
- JsonNode resultData = rootNode.get("result-data");
- ArrayNode resultDataArrayNode = null;
+ if (resultData.isArray()) {
+ ArrayNode resultDataArrayNode = (ArrayNode) resultData;
- if (resultData.isArray()) {
- resultDataArrayNode = (ArrayNode) resultData;
+ Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
- Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
- JsonNode element = null;
+ while (elementIterator.hasNext()) {
+ JsonNode element = elementIterator.next();
- while (elementIterator.hasNext()) {
- element = elementIterator.next();
+ final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
+ final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
- final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
- final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
+ if (resourceType != null && resourceLink != null) {
- OxmEntityDescriptor descriptor = null;
+ OxmEntityDescriptor descriptor = oxmEntityLookup.getEntityDescriptors().get(resourceType);
- if (resourceType != null && resourceLink != null) {
-
- descriptor = oxmEntityLookup.getEntityDescriptors().get(resourceType);
-
- if (descriptor == null) {
- LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
- // go to next element in iterator
- continue;
+ if (descriptor == null) {
+ LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
+ // go to next element in iterator
+ continue;
+ }
+ selflinks.add(new SelfLinkDescriptor(resourceLink,
+ SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
}
- selflinks.add(new SelfLinkDescriptor(resourceLink,
- SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
-
-
}
}
+ } catch (IOException exc) {
+ String message = "Could not deserialize JSON (representing operation result) as node tree. "
+ + "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
+ LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
}
}
}
aaiWorkOnHand.incrementAndGet();
- supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
+ supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter,"sync"), aaiExecutor)
.whenComplete((result, error) -> {
aaiWorkOnHand.decrementAndGet();
if (attr != null) {
suggestableAttr = Arrays.asList(attr.split(","));
- List<String> suggestableValue = new ArrayList<String>();
+ List<String> suggestableValue = new ArrayList<>();
for (String attribute : suggestableAttr) {
if (node.get(attribute) != null && node.get(attribute).asText().length() > 0) {
suggestableValue.add(attribute);
}
}
- return new ArrayList<String>();
+ return new ArrayList<>();
}
/**
if (sse.isSuggestableDoc()) {
String link = null;
try {
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sse.getId());
+ link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), sse.getId());
} catch (Exception exc) {
LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
}
esWorkOnHand.incrementAndGet();
- supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
- .whenComplete((result, error) -> {
+ supplyAsync(new PerformSearchServiceRetrieval(n2, searchServiceAdapter), esExecutor)
+ .whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
*/
String link = null;
try {
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sse.getId());
+ link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), sse.getId());
} catch (Exception exc) {
LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
return;
updateElasticTxn.setOperationType(HttpMethod.PUT);
esWorkOnHand.incrementAndGet();
- supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
- esPutExecutor).whenComplete((result, error) -> {
+ supplyAsync(new PerformSearchServicePut(jsonPayload, updateElasticTxn, searchServiceAdapter),
+ esPutExecutor).whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
SuggestionSearchEntity sus = susc.getSuggestionSearchEntity();
NetworkTransaction txn = susc.getNetworkTransaction();
- String link = null;
- try {
- /*
- * In this retry flow the se object has already derived its fields
- */
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sus.getId());
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
- }
-
- if (link != null) {
- NetworkTransaction retryTransaction = new NetworkTransaction();
- retryTransaction.setLink(link);
- retryTransaction.setEntityType(txn.getEntityType());
- retryTransaction.setDescriptor(txn.getDescriptor());
- retryTransaction.setOperationType(HttpMethod.GET);
-
- /*
- * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
- * called incrementAndGet when queuing the failed PUT!
- */
-
- supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
- esExecutor).whenComplete((result, error) -> {
-
- esWorkOnHand.decrementAndGet();
-
- if (error != null) {
- LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
- } else {
- updateElasticSearchCounters(result);
- performDocumentUpsert(result, sus);
- }
- });
- }
+ final Consumer<NetworkTransaction> networkTransactionConsumer = (result) -> performDocumentUpsert(result, sus);
+ performRetrySync(sus.getId(), networkTransactionConsumer, txn);
}
}