2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 package org.onap.aai.sparky.synchronizer;
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.Deque;
31 import java.util.Iterator;
32 import java.util.List;
34 import java.util.concurrent.ConcurrentLinkedDeque;
35 import java.util.function.Supplier;
37 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
38 import org.onap.aai.sparky.dal.NetworkTransaction;
39 import org.onap.aai.sparky.dal.elasticsearch.config.ElasticSearchConfig;
40 import org.onap.aai.sparky.dal.rest.HttpMethod;
41 import org.onap.aai.sparky.dal.rest.OperationResult;
42 import org.onap.aai.sparky.inventory.entity.GeoIndexDocument;
43 import org.onap.aai.sparky.logging.AaiUiMsgs;
44 import org.onap.aai.sparky.synchronizer.entity.SelfLinkDescriptor;
45 import org.onap.aai.sparky.synchronizer.enumeration.OperationState;
46 import org.onap.aai.sparky.synchronizer.enumeration.SynchronizerState;
47 import org.onap.aai.sparky.synchronizer.task.PerformActiveInventoryRetrieval;
48 import org.onap.aai.sparky.synchronizer.task.StoreDocumentTask;
49 import org.onap.aai.sparky.util.NodeUtils;
50 import org.onap.aai.cl.api.Logger;
51 import org.onap.aai.cl.eelf.LoggerFactory;
52 import org.onap.aai.cl.mdc.MdcContext;
55 import com.fasterxml.jackson.core.JsonProcessingException;
56 import com.fasterxml.jackson.databind.JsonNode;
57 import com.fasterxml.jackson.databind.node.ArrayNode;
61 * The Class GeoSynchronizer.
63 public class GeoSynchronizer extends AbstractEntitySynchronizer implements IndexSynchronizer {
65 private static final Logger LOG = LoggerFactory.getInstance().getLogger(GeoSynchronizer.class);
67 private boolean allWorkEnumerated;
68 private Deque<SelfLinkDescriptor> selflinks;
70 private ElasticSearchConfig elasticConfig = null;
71 private Map<String, OxmEntityDescriptor> geoDescriptorMap = null;
74 * Instantiates a new geo synchronizer.
76 * @param indexName the index name
77 * @throws Exception the exception
79 public GeoSynchronizer(String indexName) throws Exception {
81 super(LOG, "GEO", 2, 5, 5, indexName);
82 this.allWorkEnumerated = false;
83 this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
84 this.synchronizerName = "Geo Synchronizer";
85 this.geoDescriptorMap = oxmModelLoader.getGeoEntityDescriptors();
86 this.aaiEntityStats.initializeCountersFromOxmEntityDescriptors(geoDescriptorMap);
87 this.esEntityStats.initializeCountersFromOxmEntityDescriptors(geoDescriptorMap);
88 this.syncDurationInMs = -1;
93 * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#doSync()
96 public OperationState doSync() {
98 allWorkEnumerated = false;
99 syncStartedTimeStampInMs = System.currentTimeMillis();
100 String txnID = NodeUtils.getRandomTxnId();
101 MdcContext.initialize(txnID, "GeoSynchronizer", "", "Sync", "");
104 return OperationState.OK;
109 * Collect all the work.
111 * @return the operation state
113 public OperationState collectAllTheWork() {
114 final Map<String,String> contextMap = MDC.getCopyOfContextMap();
115 if (elasticConfig == null) {
117 elasticConfig = ElasticSearchConfig.getConfig();
118 } catch (Exception exc) {
119 LOG.error(AaiUiMsgs.CONFIGURATION_ERROR, "Search");
123 if (geoDescriptorMap.isEmpty()) {
124 setShouldSkipSync(true);
125 LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "geo entities");
126 return OperationState.ERROR;
129 Collection<String> syncTypes = geoDescriptorMap.keySet();
134 * launch a parallel async thread to process the documents for each entity-type (to max the of
135 * the configured executor anyway)
138 aaiWorkOnHand.set(syncTypes.size());
140 for (String key : syncTypes) {
142 supplyAsync(new Supplier<Void>() {
146 MDC.setContextMap(contextMap);
147 OperationResult typeLinksResult = null;
149 typeLinksResult = aaiDataProvider.getSelfLinksByEntityType(key);
150 aaiWorkOnHand.decrementAndGet();
151 processEntityTypeSelfLinks(typeLinksResult);
152 } catch (Exception exc) {
153 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
159 }, aaiExecutor).whenComplete((result, error) -> {
162 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
168 while (aaiWorkOnHand.get() != 0) {
170 if (LOG.isDebugEnabled()) {
171 LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
177 aaiWorkOnHand.set(selflinks.size());
178 allWorkEnumerated = true;
181 } catch (Exception exc) {
182 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
184 return OperationState.OK;
190 private void syncEntityTypes() {
192 while (selflinks.peek() != null) {
194 SelfLinkDescriptor linkDescriptor = selflinks.poll();
195 aaiWorkOnHand.decrementAndGet();
197 OxmEntityDescriptor descriptor = null;
199 if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
201 descriptor = oxmModelLoader.getEntityDescriptor(linkDescriptor.getEntityType());
203 if (descriptor == null) {
204 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
205 // go to next element in iterator
209 NetworkTransaction txn = new NetworkTransaction();
210 txn.setDescriptor(descriptor);
211 txn.setLink(linkDescriptor.getSelfLink());
212 txn.setOperationType(HttpMethod.GET);
213 txn.setEntityType(linkDescriptor.getEntityType());
215 aaiWorkOnHand.incrementAndGet();
217 supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiDataProvider), aaiExecutor)
218 .whenComplete((result, error) -> {
220 aaiWorkOnHand.decrementAndGet();
223 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
225 if (result == null) {
226 LOG.error(AaiUiMsgs.SELF_LINK_GET_NO_RESPONSE, linkDescriptor.getSelfLink());
228 processEntityTypeSelfLinkResult(result);
237 * Process entity type self links.
239 * @param operationResult the operation result
241 private void processEntityTypeSelfLinks(OperationResult operationResult) {
243 JsonNode rootNode = null;
245 final String jsonResult = operationResult.getResult();
247 if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
250 rootNode = mapper.readTree(jsonResult);
251 } catch (IOException exc) {
252 LOG.error(AaiUiMsgs.ERROR_GENERIC, exc);
256 JsonNode resultData = rootNode.get("result-data");
257 ArrayNode resultDataArrayNode = null;
259 if (resultData.isArray()) {
260 resultDataArrayNode = (ArrayNode) resultData;
262 Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
263 JsonNode element = null;
265 while (elementIterator.hasNext()) {
266 element = elementIterator.next();
268 final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
269 final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
271 if (resourceType != null && resourceLink != null) {
273 if (geoDescriptorMap.containsKey(resourceType)) {
274 selflinks.add(new SelfLinkDescriptor(resourceLink + "?nodes-only", resourceType));
276 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
277 // go to next element in iterator
289 * Process entity type self link result.
293 private void processEntityTypeSelfLinkResult(NetworkTransaction txn) {
295 updateActiveInventoryCounters(txn);
297 if (!txn.getOperationResult().wasSuccessful()) {
302 if (!(txn.getDescriptor().getGeoLatName().isEmpty()
303 && txn.getDescriptor().getGeoLongName().isEmpty())) {
305 GeoIndexDocument geoDoc = new GeoIndexDocument(oxmModelLoader);
307 final String jsonResult = txn.getOperationResult().getResult();
309 if (jsonResult != null && jsonResult.length() > 0) {
311 populateGeoDocument(geoDoc, jsonResult, txn.getDescriptor(), txn.getLink());
313 if (!geoDoc.isValidGeoDocument()) {
315 LOG.info(AaiUiMsgs.GEO_SYNC_IGNORING_ENTITY, geoDoc.getEntityType(), geoDoc.toString());
321 link = getElasticFullUrl("/" + geoDoc.getId(), getIndexName(), "default");
322 } catch (Exception exc) {
323 LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc);
328 NetworkTransaction n2 = new NetworkTransaction();
330 n2.setEntityType(txn.getEntityType());
331 n2.setDescriptor(txn.getDescriptor());
332 n2.setOperationType(HttpMethod.PUT);
334 esWorkOnHand.incrementAndGet();
336 supplyAsync(new StoreDocumentTask(geoDoc, n2, esDataProvider), esExecutor)
337 .whenComplete((result, error) -> {
339 esWorkOnHand.decrementAndGet();
342 LOG.error(AaiUiMsgs.ES_STORE_FAILURE, error.getMessage());
344 updateElasticSearchCounters(result);
345 processStoreDocumentResult(result);
352 } catch (JsonProcessingException exc) {
353 LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
354 } catch (IOException exc) {
355 LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
363 * Process store document result.
367 private void processStoreDocumentResult(NetworkTransaction txn) {
369 OperationResult or = txn.getOperationResult();
371 if (!or.wasSuccessful()) {
372 LOG.error(AaiUiMsgs.ES_STORE_FAILURE, or.toString());
374 * if(or.getResultCode() != 404 || (or.getResultCode() == 404 &&
375 * !synchronizerConfig.isResourceNotFoundErrorsSupressed())) { logger.error(
376 * "Skipping failed resource = " + "link" + " RC=[" + or.getResultCode() + "]. Message: " +
386 public SynchronizerState getState() {
389 return SynchronizerState.PERFORMING_SYNCHRONIZATION;
392 return SynchronizerState.IDLE;
397 * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
400 public String getStatReport(boolean showFinalReport) {
401 syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
402 return this.getStatReport(syncDurationInMs, showFinalReport);
406 * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#shutdown()
409 public void shutdown() {
410 this.shutdownExecutors();
414 * Populate geo document.
417 * @param result the result
418 * @param resultDescriptor the result descriptor
419 * @param entityLink the entity link
420 * @throws JsonProcessingException the json processing exception
421 * @throws IOException Signals that an I/O exception has occurred.
423 protected void populateGeoDocument(GeoIndexDocument doc, String result,
424 OxmEntityDescriptor resultDescriptor, String entityLink)
425 throws JsonProcessingException, IOException {
427 doc.setSelfLink(entityLink);
428 doc.setEntityType(resultDescriptor.getEntityName());
430 JsonNode entityNode = mapper.readTree(result);
432 List<String> primaryKeyValues = new ArrayList<String>();
433 String pkeyValue = null;
435 for (String keyName : resultDescriptor.getPrimaryKeyAttributeName()) {
436 pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
437 if (pkeyValue != null) {
438 primaryKeyValues.add(pkeyValue);
440 LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName());
444 final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
445 doc.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
446 String geoLatKey = resultDescriptor.getGeoLatName();
447 String geoLongKey = resultDescriptor.getGeoLongName();
449 doc.setLatitude(NodeUtils.getNodeFieldAsText(entityNode, geoLatKey));
450 doc.setLongitude(NodeUtils.getNodeFieldAsText(entityNode, geoLongKey));
456 protected boolean isSyncDone() {
457 int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
459 if (totalWorkOnHand > 0 || !allWorkEnumerated) {