2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 package org.onap.aai.sparky.topology.sync;
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.Deque;
31 import java.util.Iterator;
32 import java.util.List;
34 import java.util.concurrent.ConcurrentLinkedDeque;
35 import java.util.function.Supplier;
37 import org.onap.aai.cl.api.Logger;
38 import org.onap.aai.cl.eelf.LoggerFactory;
39 import org.onap.aai.cl.mdc.MdcContext;
40 import org.onap.aai.restclient.client.OperationResult;
41 import org.onap.aai.sparky.config.oxm.GeoEntityLookup;
42 import org.onap.aai.sparky.config.oxm.GeoOxmEntityDescriptor;
43 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
44 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
45 import org.onap.aai.sparky.dal.NetworkTransaction;
46 import org.onap.aai.sparky.dal.elasticsearch.config.ElasticSearchConfig;
47 import org.onap.aai.sparky.dal.rest.HttpMethod;
48 import org.onap.aai.sparky.inventory.entity.GeoIndexDocument;
49 import org.onap.aai.sparky.logging.AaiUiMsgs;
50 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
51 import org.onap.aai.sparky.sync.IndexSynchronizer;
52 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
53 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
54 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
55 import org.onap.aai.sparky.sync.enumeration.OperationState;
56 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
57 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
58 import org.onap.aai.sparky.sync.task.StoreDocumentTask;
59 import org.onap.aai.sparky.util.NodeUtils;
62 import com.fasterxml.jackson.core.JsonProcessingException;
63 import com.fasterxml.jackson.databind.JsonNode;
64 import com.fasterxml.jackson.databind.node.ArrayNode;
68 * The Class GeoSynchronizer.
70 public class GeoSynchronizer extends AbstractEntitySynchronizer implements IndexSynchronizer {
72 private static final Logger LOG = LoggerFactory.getInstance().getLogger(GeoSynchronizer.class);
74 private boolean allWorkEnumerated;
75 private Deque<SelfLinkDescriptor> selflinks;
77 private ElasticSearchConfig elasticConfig = null;
78 private Map<String, GeoOxmEntityDescriptor> geoDescriptorMap = null;
81 * Instantiates a new geo synchronizer.
83 * @param indexName the index name
84 * @throws Exception the exception
86 public GeoSynchronizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
87 int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
88 NetworkStatisticsConfig esStatConfig) throws Exception {
90 super(LOG, "GEO", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(),
91 aaiStatConfig, esStatConfig);
92 this.allWorkEnumerated = false;
93 this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
94 this.synchronizerName = "Geo Synchronizer";
95 this.geoDescriptorMap = GeoEntityLookup.getInstance().getGeoEntityDescriptors();
96 this.aaiEntityStats.intializeEntityCounters(geoDescriptorMap.keySet());
97 this.esEntityStats.intializeEntityCounters(geoDescriptorMap.keySet());
98 this.syncDurationInMs = -1;
105 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
108 public OperationState doSync() {
109 this.syncDurationInMs = -1;
111 setShouldSkipSync(false);
112 allWorkEnumerated = false;
113 syncStartedTimeStampInMs = System.currentTimeMillis();
114 String txnID = NodeUtils.getRandomTxnId();
115 MdcContext.initialize(txnID, "GeoSynchronizer", "", "Sync", "");
118 return OperationState.OK;
123 * Collect all the work.
125 * @return the operation state
127 public OperationState collectAllTheWork() {
128 final Map<String, String> contextMap = MDC.getCopyOfContextMap();
129 if (elasticConfig == null) {
131 elasticConfig = ElasticSearchConfig.getConfig();
132 } catch (Exception exc) {
133 LOG.error(AaiUiMsgs.CONFIGURATION_ERROR, "Search");
137 if (geoDescriptorMap.isEmpty()) {
138 setShouldSkipSync(true);
139 LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "geo entities");
140 return OperationState.ERROR;
143 Collection<String> syncTypes = geoDescriptorMap.keySet();
148 * launch a parallel async thread to process the documents for each entity-type (to max the of
149 * the configured executor anyway)
152 aaiWorkOnHand.set(syncTypes.size());
154 for (String key : syncTypes) {
156 supplyAsync(new Supplier<Void>() {
160 MDC.setContextMap(contextMap);
161 OperationResult typeLinksResult = null;
163 typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
164 aaiWorkOnHand.decrementAndGet();
165 processEntityTypeSelfLinks(typeLinksResult);
166 } catch (Exception exc) {
167 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
173 }, aaiExecutor).whenComplete((result, error) -> {
176 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
182 while (aaiWorkOnHand.get() != 0) {
184 if (LOG.isDebugEnabled()) {
185 LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
191 aaiWorkOnHand.set(selflinks.size());
192 allWorkEnumerated = true;
195 } catch (Exception exc) {
196 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
198 return OperationState.OK;
204 private void syncEntityTypes() {
206 while (selflinks.peek() != null) {
208 SelfLinkDescriptor linkDescriptor = selflinks.poll();
209 aaiWorkOnHand.decrementAndGet();
211 OxmEntityDescriptor descriptor = null;
213 if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
215 descriptor = OxmEntityLookup.getInstance().getEntityDescriptors()
216 .get(linkDescriptor.getEntityType());
218 if (descriptor == null) {
219 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
220 // go to next element in iterator
224 NetworkTransaction txn = new NetworkTransaction();
225 txn.setDescriptor(descriptor);
226 txn.setLink(linkDescriptor.getSelfLink());
227 txn.setOperationType(HttpMethod.GET);
228 txn.setEntityType(linkDescriptor.getEntityType());
230 aaiWorkOnHand.incrementAndGet();
232 supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
233 .whenComplete((result, error) -> {
235 aaiWorkOnHand.decrementAndGet();
238 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
240 if (result == null) {
241 LOG.error(AaiUiMsgs.SELF_LINK_GET_NO_RESPONSE, linkDescriptor.getSelfLink());
243 processEntityTypeSelfLinkResult(result);
252 * Process entity type self links.
254 * @param operationResult the operation result
256 private void processEntityTypeSelfLinks(OperationResult operationResult) {
258 JsonNode rootNode = null;
260 final String jsonResult = operationResult.getResult();
262 if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
265 rootNode = mapper.readTree(jsonResult);
266 } catch (IOException exc) {
267 LOG.error(AaiUiMsgs.ERROR_GENERIC, exc);
270 JsonNode resultData = rootNode.get("result-data");
271 ArrayNode resultDataArrayNode = null;
273 if (resultData.isArray()) {
274 resultDataArrayNode = (ArrayNode) resultData;
276 Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
277 JsonNode element = null;
279 while (elementIterator.hasNext()) {
280 element = elementIterator.next();
282 final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
283 final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
285 if (resourceType != null && resourceLink != null) {
287 if (geoDescriptorMap.containsKey(resourceType)) {
288 selflinks.add(new SelfLinkDescriptor(resourceLink + "?nodes-only", resourceType));
290 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
291 // go to next element in iterator
303 * Process entity type self link result.
307 private void processEntityTypeSelfLinkResult(NetworkTransaction txn) {
309 updateActiveInventoryCounters(txn);
311 if (!txn.getOperationResult().wasSuccessful()) {
315 GeoOxmEntityDescriptor descriptor = geoDescriptorMap.get(txn.getEntityType());
317 if (descriptor == null) {
322 if (descriptor.hasGeoEntity()) {
324 GeoIndexDocument geoDoc = new GeoIndexDocument();
326 final String jsonResult = txn.getOperationResult().getResult();
328 if (jsonResult != null && jsonResult.length() > 0) {
330 populateGeoDocument(geoDoc, jsonResult, txn.getDescriptor(), txn.getLink());
332 if (!geoDoc.isValidGeoDocument()) {
334 LOG.info(AaiUiMsgs.GEO_SYNC_IGNORING_ENTITY, geoDoc.getEntityType(), geoDoc.toString());
340 link = getElasticFullUrl("/" + geoDoc.getId(), getIndexName(), "default");
341 } catch (Exception exc) {
342 LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc);
347 NetworkTransaction n2 = new NetworkTransaction();
349 n2.setEntityType(txn.getEntityType());
350 n2.setDescriptor(txn.getDescriptor());
351 n2.setOperationType(HttpMethod.PUT);
353 esWorkOnHand.incrementAndGet();
355 supplyAsync(new StoreDocumentTask(geoDoc, n2, elasticSearchAdapter), esExecutor)
356 .whenComplete((result, error) -> {
358 esWorkOnHand.decrementAndGet();
361 LOG.error(AaiUiMsgs.ES_STORE_FAILURE, error.getMessage());
363 updateElasticSearchCounters(result);
364 processStoreDocumentResult(result);
371 } catch (JsonProcessingException exc) {
372 LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
373 } catch (IOException exc) {
374 LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
382 * Process store document result.
386 private void processStoreDocumentResult(NetworkTransaction txn) {
388 OperationResult or = txn.getOperationResult();
390 if (!or.wasSuccessful()) {
391 LOG.error(AaiUiMsgs.ES_STORE_FAILURE, or.toString());
393 * if(or.getResultCode() != 404 || (or.getResultCode() == 404 &&
394 * !synchronizerConfig.isResourceNotFoundErrorsSupressed())) { logger.error(
395 * "Skipping failed resource = " + "link" + " RC=[" + or.getResultCode() + "]. Message: " +
405 public SynchronizerState getState() {
408 return SynchronizerState.PERFORMING_SYNCHRONIZATION;
411 return SynchronizerState.IDLE;
418 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
421 public String getStatReport(boolean showFinalReport) {
422 syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
423 return this.getStatReport(syncDurationInMs, showFinalReport);
429 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
432 public void shutdown() {
433 this.shutdownExecutors();
437 * Populate geo document.
440 * @param result the result
441 * @param resultDescriptor the result descriptor
442 * @param entityLink the entity link
443 * @throws JsonProcessingException the json processing exception
444 * @throws IOException Signals that an I/O exception has occurred.
446 protected void populateGeoDocument(GeoIndexDocument doc, String result,
447 OxmEntityDescriptor resultDescriptor, String entityLink)
448 throws JsonProcessingException, IOException {
450 doc.setSelfLink(entityLink);
451 doc.setEntityType(resultDescriptor.getEntityName());
453 JsonNode entityNode = mapper.readTree(result);
455 List<String> primaryKeyValues = new ArrayList<String>();
456 String pkeyValue = null;
458 for (String keyName : resultDescriptor.getPrimaryKeyAttributeNames()) {
459 pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
460 if (pkeyValue != null) {
461 primaryKeyValues.add(pkeyValue);
463 LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName());
467 final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
468 doc.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
470 GeoOxmEntityDescriptor descriptor = geoDescriptorMap.get(resultDescriptor.getEntityName());
472 String geoLatKey = descriptor.getGeoLatName();
473 String geoLongKey = descriptor.getGeoLongName();
475 doc.setLatitude(NodeUtils.getNodeFieldAsText(entityNode, geoLatKey));
476 doc.setLongitude(NodeUtils.getNodeFieldAsText(entityNode, geoLongKey));
482 protected boolean isSyncDone() {
483 if (shouldSkipSync()) {
484 syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
488 int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
490 if (totalWorkOnHand > 0 || !allWorkEnumerated) {