2 * ============LICENSE_START===================================================
3 * SPARKY (AAI UI service)
4 * ============================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=====================================================
22 * ECOMP and OpenECOMP are trademarks
23 * and service marks of AT&T Intellectual Property.
25 package org.onap.aai.sparky.topology.sync;
27 import static java.util.concurrent.CompletableFuture.supplyAsync;
29 import java.io.IOException;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Deque;
33 import java.util.Iterator;
34 import java.util.List;
36 import java.util.concurrent.ConcurrentLinkedDeque;
37 import java.util.function.Supplier;
39 import org.onap.aai.cl.api.Logger;
40 import org.onap.aai.cl.eelf.LoggerFactory;
41 import org.onap.aai.cl.mdc.MdcContext;
42 import org.onap.aai.restclient.client.OperationResult;
43 import org.onap.aai.sparky.config.oxm.GeoEntityLookup;
44 import org.onap.aai.sparky.config.oxm.GeoOxmEntityDescriptor;
45 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
46 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
47 import org.onap.aai.sparky.dal.NetworkTransaction;
48 import org.onap.aai.sparky.dal.rest.HttpMethod;
49 import org.onap.aai.sparky.inventory.entity.GeoIndexDocument;
50 import org.onap.aai.sparky.logging.AaiUiMsgs;
51 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
52 import org.onap.aai.sparky.sync.IndexSynchronizer;
53 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
54 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
55 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
56 import org.onap.aai.sparky.sync.enumeration.OperationState;
57 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
58 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
59 import org.onap.aai.sparky.sync.task.StoreDocumentTask;
60 import org.onap.aai.sparky.util.NodeUtils;
63 import com.fasterxml.jackson.core.JsonProcessingException;
64 import com.fasterxml.jackson.databind.JsonNode;
65 import com.fasterxml.jackson.databind.node.ArrayNode;
69 * The Class GeoSynchronizer.
71 public class GeoSynchronizer extends AbstractEntitySynchronizer implements IndexSynchronizer {
73 private static final Logger LOG = LoggerFactory.getInstance().getLogger(GeoSynchronizer.class);
75 private boolean allWorkEnumerated;
76 private Deque<SelfLinkDescriptor> selflinks;
77 private GeoEntityLookup geoEntityLookup;
78 private OxmEntityLookup oxmEntityLookup;
80 private Map<String, GeoOxmEntityDescriptor> geoDescriptorMap = null;
83 * Instantiates a new geo synchronizer.
85 * @param indexName the index name
86 * @throws Exception the exception
88 public GeoSynchronizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
89 int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
90 NetworkStatisticsConfig esStatConfig, GeoEntityLookup geoEntityLookup,
91 OxmEntityLookup oxmEntityLookup) throws Exception {
93 super(LOG, "GEO", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(),aaiStatConfig, esStatConfig);
94 this.geoEntityLookup = geoEntityLookup;
95 this.oxmEntityLookup = oxmEntityLookup;
96 this.allWorkEnumerated = false;
97 this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
98 this.synchronizerName = "Geo Synchronizer";
99 this.geoDescriptorMap = geoEntityLookup.getGeoEntityDescriptors();
100 this.aaiEntityStats.intializeEntityCounters(geoDescriptorMap.keySet());
101 this.esEntityStats.intializeEntityCounters(geoDescriptorMap.keySet());
102 this.syncDurationInMs = -1;
107 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
110 public OperationState doSync() {
111 this.syncDurationInMs = -1;
113 setShouldSkipSync(false);
114 allWorkEnumerated = false;
115 syncStartedTimeStampInMs = System.currentTimeMillis();
116 String txnID = NodeUtils.getRandomTxnId();
117 MdcContext.initialize(txnID, "GeoSynchronizer", "", "Sync", "");
120 return OperationState.OK;
125 * Collect all the work.
127 * @return the operation state
129 public OperationState collectAllTheWork() {
130 final Map<String,String> contextMap = MDC.getCopyOfContextMap();
132 if (geoDescriptorMap.isEmpty()) {
133 setShouldSkipSync(true);
134 LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "geo entities");
135 return OperationState.ERROR;
138 Collection<String> syncTypes = geoDescriptorMap.keySet();
143 * launch a parallel async thread to process the documents for each entity-type (to max the of
144 * the configured executor anyway)
147 aaiWorkOnHand.set(syncTypes.size());
149 for (String key : syncTypes) {
151 supplyAsync(new Supplier<Void>() {
155 MDC.setContextMap(contextMap);
156 OperationResult typeLinksResult = null;
158 typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
159 aaiWorkOnHand.decrementAndGet();
160 processEntityTypeSelfLinks(typeLinksResult);
161 } catch (Exception exc) {
162 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
168 }, aaiExecutor).whenComplete((result, error) -> {
171 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
177 while (aaiWorkOnHand.get() != 0) {
179 if (LOG.isDebugEnabled()) {
180 LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
186 aaiWorkOnHand.set(selflinks.size());
187 allWorkEnumerated = true;
190 } catch (Exception exc) {
191 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
193 return OperationState.OK;
199 private void syncEntityTypes() {
201 while (selflinks.peek() != null) {
203 SelfLinkDescriptor linkDescriptor = selflinks.poll();
204 aaiWorkOnHand.decrementAndGet();
206 OxmEntityDescriptor descriptor = null;
208 if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
210 descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
212 if (descriptor == null) {
213 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
214 // go to next element in iterator
218 NetworkTransaction txn = new NetworkTransaction();
219 txn.setDescriptor(descriptor);
220 txn.setLink(linkDescriptor.getSelfLink());
221 txn.setOperationType(HttpMethod.GET);
222 txn.setEntityType(linkDescriptor.getEntityType());
224 aaiWorkOnHand.incrementAndGet();
226 supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
227 .whenComplete((result, error) -> {
229 aaiWorkOnHand.decrementAndGet();
232 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
234 if (result == null) {
235 LOG.error(AaiUiMsgs.SELF_LINK_GET_NO_RESPONSE, linkDescriptor.getSelfLink());
237 processEntityTypeSelfLinkResult(result);
246 * Process entity type self links.
248 * @param operationResult the operation result
250 private void processEntityTypeSelfLinks(OperationResult operationResult) {
252 JsonNode rootNode = null;
254 final String jsonResult = operationResult.getResult();
256 if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
259 rootNode = mapper.readTree(jsonResult);
260 } catch (IOException exc) {
261 LOG.error(AaiUiMsgs.ERROR_GENERIC, exc);
264 JsonNode resultData = rootNode.get("result-data");
265 ArrayNode resultDataArrayNode = null;
267 if (resultData.isArray()) {
268 resultDataArrayNode = (ArrayNode) resultData;
270 Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
271 JsonNode element = null;
273 while (elementIterator.hasNext()) {
274 element = elementIterator.next();
276 final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
277 final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
279 if (resourceType != null && resourceLink != null) {
281 if (geoDescriptorMap.containsKey(resourceType)) {
282 selflinks.add(new SelfLinkDescriptor(resourceLink + "?nodes-only", resourceType));
284 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
285 // go to next element in iterator
297 * Process entity type self link result.
301 private void processEntityTypeSelfLinkResult(NetworkTransaction txn) {
303 updateActiveInventoryCounters(txn);
305 if (!txn.getOperationResult().wasSuccessful()) {
309 GeoOxmEntityDescriptor descriptor = geoDescriptorMap.get(txn.getEntityType());
311 if ( descriptor == null ) {
316 if (descriptor.hasGeoEntity()) {
318 GeoIndexDocument geoDoc = new GeoIndexDocument();
320 final String jsonResult = txn.getOperationResult().getResult();
322 if (jsonResult != null && jsonResult.length() > 0) {
324 populateGeoDocument(geoDoc, jsonResult, txn.getDescriptor(), txn.getLink());
326 if (!geoDoc.isValidGeoDocument()) {
328 LOG.info(AaiUiMsgs.GEO_SYNC_IGNORING_ENTITY, geoDoc.getEntityType(), geoDoc.toString());
334 link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), geoDoc.getId());
335 } catch (Exception exc) {
336 LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc);
341 NetworkTransaction n2 = new NetworkTransaction();
343 n2.setEntityType(txn.getEntityType());
344 n2.setDescriptor(txn.getDescriptor());
345 n2.setOperationType(HttpMethod.PUT);
347 esWorkOnHand.incrementAndGet();
349 supplyAsync(new StoreDocumentTask(geoDoc, n2, elasticSearchAdapter), esExecutor)
350 .whenComplete((result, error) -> {
352 esWorkOnHand.decrementAndGet();
355 LOG.error(AaiUiMsgs.ES_STORE_FAILURE, error.getMessage());
357 updateElasticSearchCounters(result);
358 processStoreDocumentResult(result);
365 } catch (JsonProcessingException exc) {
366 LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
367 } catch (IOException exc) {
368 LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
376 * Process store document result.
380 private void processStoreDocumentResult(NetworkTransaction txn) {
382 OperationResult or = txn.getOperationResult();
384 if (!or.wasSuccessful()) {
385 LOG.error(AaiUiMsgs.ES_STORE_FAILURE, or.toString());
387 * if(or.getResultCode() != 404 || (or.getResultCode() == 404 &&
388 * !synchronizerConfig.isResourceNotFoundErrorsSupressed())) { logger.error(
389 * "Skipping failed resource = " + "link" + " RC=[" + or.getResultCode() + "]. Message: " +
399 public SynchronizerState getState() {
402 return SynchronizerState.PERFORMING_SYNCHRONIZATION;
405 return SynchronizerState.IDLE;
410 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
413 public String getStatReport(boolean showFinalReport) {
414 syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
415 return this.getStatReport(syncDurationInMs, showFinalReport);
419 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
422 public void shutdown() {
423 this.shutdownExecutors();
427 * Populate geo document.
430 * @param result the result
431 * @param resultDescriptor the result descriptor
432 * @param entityLink the entity link
433 * @throws JsonProcessingException the json processing exception
434 * @throws IOException Signals that an I/O exception has occurred.
436 protected void populateGeoDocument(GeoIndexDocument doc, String result,
437 OxmEntityDescriptor resultDescriptor, String entityLink)
438 throws JsonProcessingException, IOException {
440 doc.setSelfLink(entityLink);
441 doc.setEntityType(resultDescriptor.getEntityName());
443 JsonNode entityNode = mapper.readTree(result);
445 List<String> primaryKeyValues = new ArrayList<String>();
446 String pkeyValue = null;
448 for (String keyName : resultDescriptor.getPrimaryKeyAttributeNames()) {
449 pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
450 if (pkeyValue != null) {
451 primaryKeyValues.add(pkeyValue);
453 LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName());
457 final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
458 doc.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
460 GeoOxmEntityDescriptor descriptor = geoDescriptorMap.get(resultDescriptor.getEntityName());
462 String geoLatKey = descriptor.getGeoLatName();
463 String geoLongKey = descriptor.getGeoLongName();
465 doc.setLatitude(NodeUtils.getNodeFieldAsText(entityNode, geoLatKey));
466 doc.setLongitude(NodeUtils.getNodeFieldAsText(entityNode, geoLongKey));
472 protected boolean isSyncDone() {
473 if (shouldSkipSync()) {
474 syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
478 int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
480 if (totalWorkOnHand > 0 || !allWorkEnumerated) {