2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 package org.onap.aai.sparky.topology.sync;
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.Deque;
31 import java.util.Iterator;
32 import java.util.List;
34 import java.util.concurrent.ConcurrentLinkedDeque;
35 import java.util.function.Supplier;
37 import org.onap.aai.cl.api.Logger;
38 import org.onap.aai.cl.eelf.LoggerFactory;
39 import org.onap.aai.cl.mdc.MdcContext;
40 import org.onap.aai.restclient.client.OperationResult;
41 import org.onap.aai.sparky.config.oxm.GeoEntityLookup;
42 import org.onap.aai.sparky.config.oxm.GeoOxmEntityDescriptor;
43 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
44 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
45 import org.onap.aai.sparky.dal.NetworkTransaction;
46 import org.onap.aai.sparky.dal.rest.HttpMethod;
47 import org.onap.aai.sparky.inventory.entity.GeoIndexDocument;
48 import org.onap.aai.sparky.logging.AaiUiMsgs;
49 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
50 import org.onap.aai.sparky.sync.IndexSynchronizer;
51 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
52 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
53 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
54 import org.onap.aai.sparky.sync.enumeration.OperationState;
55 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
56 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
57 import org.onap.aai.sparky.sync.task.StoreDocumentTask;
58 import org.onap.aai.sparky.util.NodeUtils;
61 import com.fasterxml.jackson.core.JsonProcessingException;
62 import com.fasterxml.jackson.databind.JsonNode;
63 import com.fasterxml.jackson.databind.node.ArrayNode;
67 * The Class GeoSynchronizer.
69 public class GeoSynchronizer extends AbstractEntitySynchronizer implements IndexSynchronizer {
71 private static final Logger LOG = LoggerFactory.getInstance().getLogger(GeoSynchronizer.class);
73 private boolean allWorkEnumerated;
74 private Deque<SelfLinkDescriptor> selflinks;
75 private GeoEntityLookup geoEntityLookup;
76 private OxmEntityLookup oxmEntityLookup;
78 private Map<String, GeoOxmEntityDescriptor> geoDescriptorMap = null;
81 * Instantiates a new geo synchronizer.
83 * @param indexName the index name
84 * @throws Exception the exception
86 public GeoSynchronizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
87 int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
88 NetworkStatisticsConfig esStatConfig, GeoEntityLookup geoEntityLookup,
89 OxmEntityLookup oxmEntityLookup) throws Exception {
91 super(LOG, "GEO", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(),aaiStatConfig, esStatConfig);
92 this.geoEntityLookup = geoEntityLookup;
93 this.oxmEntityLookup = oxmEntityLookup;
94 this.allWorkEnumerated = false;
95 this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
96 this.synchronizerName = "Geo Synchronizer";
97 this.geoDescriptorMap = geoEntityLookup.getGeoEntityDescriptors();
98 this.aaiEntityStats.intializeEntityCounters(geoDescriptorMap.keySet());
99 this.esEntityStats.intializeEntityCounters(geoDescriptorMap.keySet());
100 this.syncDurationInMs = -1;
105 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
108 public OperationState doSync() {
109 this.syncDurationInMs = -1;
111 setShouldSkipSync(false);
112 allWorkEnumerated = false;
113 syncStartedTimeStampInMs = System.currentTimeMillis();
114 String txnID = NodeUtils.getRandomTxnId();
115 MdcContext.initialize(txnID, "GeoSynchronizer", "", "Sync", "");
118 return OperationState.OK;
123 * Collect all the work.
125 * @return the operation state
127 public OperationState collectAllTheWork() {
128 final Map<String,String> contextMap = MDC.getCopyOfContextMap();
130 if (geoDescriptorMap.isEmpty()) {
131 setShouldSkipSync(true);
132 LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "geo entities");
133 return OperationState.ERROR;
136 Collection<String> syncTypes = geoDescriptorMap.keySet();
141 * launch a parallel async thread to process the documents for each entity-type (to max the of
142 * the configured executor anyway)
145 aaiWorkOnHand.set(syncTypes.size());
147 for (String key : syncTypes) {
149 supplyAsync(new Supplier<Void>() {
153 MDC.setContextMap(contextMap);
154 OperationResult typeLinksResult = null;
156 typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
157 aaiWorkOnHand.decrementAndGet();
158 processEntityTypeSelfLinks(typeLinksResult);
159 } catch (Exception exc) {
160 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
166 }, aaiExecutor).whenComplete((result, error) -> {
169 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
175 while (aaiWorkOnHand.get() != 0) {
177 if (LOG.isDebugEnabled()) {
178 LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
184 aaiWorkOnHand.set(selflinks.size());
185 allWorkEnumerated = true;
188 } catch (Exception exc) {
189 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
191 return OperationState.OK;
197 private void syncEntityTypes() {
199 while (selflinks.peek() != null) {
201 SelfLinkDescriptor linkDescriptor = selflinks.poll();
202 aaiWorkOnHand.decrementAndGet();
204 OxmEntityDescriptor descriptor = null;
206 if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
208 descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
210 if (descriptor == null) {
211 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
212 // go to next element in iterator
216 NetworkTransaction txn = new NetworkTransaction();
217 txn.setDescriptor(descriptor);
218 txn.setLink(linkDescriptor.getSelfLink());
219 txn.setOperationType(HttpMethod.GET);
220 txn.setEntityType(linkDescriptor.getEntityType());
222 aaiWorkOnHand.incrementAndGet();
224 supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
225 .whenComplete((result, error) -> {
227 aaiWorkOnHand.decrementAndGet();
230 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
232 if (result == null) {
233 LOG.error(AaiUiMsgs.SELF_LINK_GET_NO_RESPONSE, linkDescriptor.getSelfLink());
235 processEntityTypeSelfLinkResult(result);
244 * Process entity type self links.
246 * @param operationResult the operation result
248 private void processEntityTypeSelfLinks(OperationResult operationResult) {
250 JsonNode rootNode = null;
252 final String jsonResult = operationResult.getResult();
254 if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
257 rootNode = mapper.readTree(jsonResult);
258 } catch (IOException exc) {
259 LOG.error(AaiUiMsgs.ERROR_GENERIC, exc);
262 JsonNode resultData = rootNode.get("result-data");
263 ArrayNode resultDataArrayNode = null;
265 if (resultData.isArray()) {
266 resultDataArrayNode = (ArrayNode) resultData;
268 Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
269 JsonNode element = null;
271 while (elementIterator.hasNext()) {
272 element = elementIterator.next();
274 final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
275 final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
277 if (resourceType != null && resourceLink != null) {
279 if (geoDescriptorMap.containsKey(resourceType)) {
280 selflinks.add(new SelfLinkDescriptor(resourceLink + "?nodes-only", resourceType));
282 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
283 // go to next element in iterator
295 * Process entity type self link result.
299 private void processEntityTypeSelfLinkResult(NetworkTransaction txn) {
301 updateActiveInventoryCounters(txn);
303 if (!txn.getOperationResult().wasSuccessful()) {
307 GeoOxmEntityDescriptor descriptor = geoDescriptorMap.get(txn.getEntityType());
309 if ( descriptor == null ) {
314 if (descriptor.hasGeoEntity()) {
316 GeoIndexDocument geoDoc = new GeoIndexDocument();
318 final String jsonResult = txn.getOperationResult().getResult();
320 if (jsonResult != null && jsonResult.length() > 0) {
322 populateGeoDocument(geoDoc, jsonResult, txn.getDescriptor(), txn.getLink());
324 if (!geoDoc.isValidGeoDocument()) {
326 LOG.info(AaiUiMsgs.GEO_SYNC_IGNORING_ENTITY, geoDoc.getEntityType(), geoDoc.toString());
332 link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), geoDoc.getId());
333 } catch (Exception exc) {
334 LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc);
339 NetworkTransaction n2 = new NetworkTransaction();
341 n2.setEntityType(txn.getEntityType());
342 n2.setDescriptor(txn.getDescriptor());
343 n2.setOperationType(HttpMethod.PUT);
345 esWorkOnHand.incrementAndGet();
347 supplyAsync(new StoreDocumentTask(geoDoc, n2, elasticSearchAdapter), esExecutor)
348 .whenComplete((result, error) -> {
350 esWorkOnHand.decrementAndGet();
353 LOG.error(AaiUiMsgs.ES_STORE_FAILURE, error.getMessage());
355 updateElasticSearchCounters(result);
356 processStoreDocumentResult(result);
363 } catch (JsonProcessingException exc) {
364 LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
365 } catch (IOException exc) {
366 LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
374 * Process store document result.
378 private void processStoreDocumentResult(NetworkTransaction txn) {
380 OperationResult or = txn.getOperationResult();
382 if (!or.wasSuccessful()) {
383 LOG.error(AaiUiMsgs.ES_STORE_FAILURE, or.toString());
385 * if(or.getResultCode() != 404 || (or.getResultCode() == 404 &&
386 * !synchronizerConfig.isResourceNotFoundErrorsSupressed())) { logger.error(
387 * "Skipping failed resource = " + "link" + " RC=[" + or.getResultCode() + "]. Message: " +
397 public SynchronizerState getState() {
400 return SynchronizerState.PERFORMING_SYNCHRONIZATION;
403 return SynchronizerState.IDLE;
408 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
411 public String getStatReport(boolean showFinalReport) {
412 syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
413 return this.getStatReport(syncDurationInMs, showFinalReport);
417 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
420 public void shutdown() {
421 this.shutdownExecutors();
425 * Populate geo document.
428 * @param result the result
429 * @param resultDescriptor the result descriptor
430 * @param entityLink the entity link
431 * @throws JsonProcessingException the json processing exception
432 * @throws IOException Signals that an I/O exception has occurred.
434 protected void populateGeoDocument(GeoIndexDocument doc, String result,
435 OxmEntityDescriptor resultDescriptor, String entityLink)
436 throws JsonProcessingException, IOException {
438 doc.setSelfLink(entityLink);
439 doc.setEntityType(resultDescriptor.getEntityName());
441 JsonNode entityNode = mapper.readTree(result);
443 List<String> primaryKeyValues = new ArrayList<String>();
444 String pkeyValue = null;
446 for (String keyName : resultDescriptor.getPrimaryKeyAttributeNames()) {
447 pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
448 if (pkeyValue != null) {
449 primaryKeyValues.add(pkeyValue);
451 LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName());
455 final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
456 doc.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
458 GeoOxmEntityDescriptor descriptor = geoDescriptorMap.get(resultDescriptor.getEntityName());
460 String geoLatKey = descriptor.getGeoLatName();
461 String geoLongKey = descriptor.getGeoLongName();
463 doc.setLatitude(NodeUtils.getNodeFieldAsText(entityNode, geoLatKey));
464 doc.setLongitude(NodeUtils.getNodeFieldAsText(entityNode, geoLongKey));
470 protected boolean isSyncDone() {
471 if (shouldSkipSync()) {
472 syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
476 int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
478 if (totalWorkOnHand > 0 || !allWorkEnumerated) {