2 * ============LICENSE_START===================================================
3 * SPARKY (AAI UI service)
4 * ============================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=====================================================
22 * ECOMP and OpenECOMP are trademarks
23 * and service marks of AT&T Intellectual Property.
26 package org.openecomp.sparky.synchronizer;
28 import static java.util.concurrent.CompletableFuture.supplyAsync;
30 import java.io.IOException;
31 import java.net.InetAddress;
32 import java.net.UnknownHostException;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.Deque;
36 import java.util.Iterator;
37 import java.util.List;
39 import java.util.concurrent.ConcurrentLinkedDeque;
40 import java.util.function.Supplier;
42 import org.openecomp.cl.api.Logger;
43 import org.openecomp.cl.eelf.LoggerFactory;
44 import org.openecomp.sparky.config.oxm.OxmEntityDescriptor;
45 import org.openecomp.sparky.dal.NetworkTransaction;
46 import org.openecomp.sparky.dal.elasticsearch.config.ElasticSearchConfig;
47 import org.openecomp.sparky.dal.rest.HttpMethod;
48 import org.openecomp.sparky.dal.rest.OperationResult;
49 import org.openecomp.sparky.inventory.entity.GeoIndexDocument;
50 import org.openecomp.sparky.logging.AaiUiMsgs;
51 import org.openecomp.sparky.synchronizer.entity.SelfLinkDescriptor;
52 import org.openecomp.sparky.synchronizer.enumeration.OperationState;
53 import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
54 import org.openecomp.sparky.synchronizer.task.PerformActiveInventoryRetrieval;
55 import org.openecomp.sparky.synchronizer.task.StoreDocumentTask;
56 import org.openecomp.sparky.util.NodeUtils;
59 import org.openecomp.cl.mdc.MdcContext;
60 import com.fasterxml.jackson.core.JsonProcessingException;
61 import com.fasterxml.jackson.databind.JsonNode;
62 import com.fasterxml.jackson.databind.node.ArrayNode;
66 * The Class GeoSynchronizer.
68 public class GeoSynchronizer extends AbstractEntitySynchronizer implements IndexSynchronizer {
70 private static final Logger LOG = LoggerFactory.getInstance().getLogger(GeoSynchronizer.class);
72 private boolean allWorkEnumerated;
73 private Deque<SelfLinkDescriptor> selflinks;
75 private ElasticSearchConfig elasticConfig = null;
76 private Map<String, OxmEntityDescriptor> geoDescriptorMap = null;
79 * Instantiates a new geo synchronizer.
81 * @param indexName the index name
82 * @throws Exception the exception
84 public GeoSynchronizer(String indexName) throws Exception {
86 super(LOG, "GEO", 2, 5, 5, indexName);
87 this.allWorkEnumerated = false;
88 this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
89 this.synchronizerName = "Geo Synchronizer";
90 this.geoDescriptorMap = oxmModelLoader.getGeoEntityDescriptors();
91 this.aaiEntityStats.initializeCountersFromOxmEntityDescriptors(geoDescriptorMap);
92 this.esEntityStats.initializeCountersFromOxmEntityDescriptors(geoDescriptorMap);
98 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
101 public OperationState doSync() {
103 allWorkEnumerated = false;
104 syncStartedTimeStampInMs = System.currentTimeMillis();
105 String txnID = NodeUtils.getRandomTxnId();
106 MdcContext.initialize(txnID, "GeoSynchronizer", "", "Sync", "");
109 return OperationState.OK;
114 * Collect all the work.
116 * @return the operation state
118 public OperationState collectAllTheWork() {
119 final Map<String,String> contextMap = MDC.getCopyOfContextMap();
120 if (elasticConfig == null) {
122 elasticConfig = ElasticSearchConfig.getConfig();
123 } catch (Exception exc) {
124 LOG.error(AaiUiMsgs.CONFIGURATION_ERROR, "Search");
128 if (geoDescriptorMap.isEmpty()) {
129 LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "geo entities");
130 return OperationState.ERROR;
133 Collection<String> syncTypes = geoDescriptorMap.keySet();
138 * launch a parallel async thread to process the documents for each entity-type (to max the of
139 * the configured executor anyway)
142 aaiWorkOnHand.set(syncTypes.size());
144 for (String key : syncTypes) {
146 supplyAsync(new Supplier<Void>() {
150 MDC.setContextMap(contextMap);
151 OperationResult typeLinksResult = null;
153 typeLinksResult = aaiDataProvider.getSelfLinksByEntityType(key);
154 aaiWorkOnHand.decrementAndGet();
155 processEntityTypeSelfLinks(typeLinksResult);
156 } catch (Exception exc) {
157 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
163 }, aaiExecutor).whenComplete((result, error) -> {
166 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
172 while (aaiWorkOnHand.get() != 0) {
174 if (LOG.isDebugEnabled()) {
175 LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
181 aaiWorkOnHand.set(selflinks.size());
182 allWorkEnumerated = true;
185 } catch (Exception exc) {
186 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
188 return OperationState.OK;
194 private void syncEntityTypes() {
196 while (selflinks.peek() != null) {
198 SelfLinkDescriptor linkDescriptor = selflinks.poll();
199 aaiWorkOnHand.decrementAndGet();
201 OxmEntityDescriptor descriptor = null;
203 if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
205 descriptor = oxmModelLoader.getEntityDescriptor(linkDescriptor.getEntityType());
207 if (descriptor == null) {
208 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
209 // go to next element in iterator
213 NetworkTransaction txn = new NetworkTransaction();
214 txn.setDescriptor(descriptor);
215 txn.setLink(linkDescriptor.getSelfLink());
216 txn.setOperationType(HttpMethod.GET);
217 txn.setEntityType(linkDescriptor.getEntityType());
219 aaiWorkOnHand.incrementAndGet();
221 supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiDataProvider), aaiExecutor)
222 .whenComplete((result, error) -> {
224 aaiWorkOnHand.decrementAndGet();
227 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
229 if (result == null) {
230 LOG.error(AaiUiMsgs.SELF_LINK_GET_NO_RESPONSE, linkDescriptor.getSelfLink());
232 processEntityTypeSelfLinkResult(result);
241 * Process entity type self links.
243 * @param operationResult the operation result
245 private void processEntityTypeSelfLinks(OperationResult operationResult) {
247 JsonNode rootNode = null;
249 final String jsonResult = operationResult.getResult();
251 if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
254 rootNode = mapper.readTree(jsonResult);
255 } catch (IOException exc) {
256 LOG.error(AaiUiMsgs.ERROR_GENERIC, exc);
259 JsonNode resultData = rootNode.get("result-data");
260 ArrayNode resultDataArrayNode = null;
262 if (resultData.isArray()) {
263 resultDataArrayNode = (ArrayNode) resultData;
265 Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
266 JsonNode element = null;
268 while (elementIterator.hasNext()) {
269 element = elementIterator.next();
271 final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
272 final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
274 if (resourceType != null && resourceLink != null) {
276 if (geoDescriptorMap.containsKey(resourceType)) {
277 selflinks.add(new SelfLinkDescriptor(resourceLink + "?nodes-only", resourceType));
279 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
280 // go to next element in iterator
292 * Process entity type self link result.
296 private void processEntityTypeSelfLinkResult(NetworkTransaction txn) {
298 updateActiveInventoryCounters(txn);
300 if (!txn.getOperationResult().wasSuccessful()) {
305 if (!(txn.getDescriptor().getGeoLatName().isEmpty()
306 && txn.getDescriptor().getGeoLongName().isEmpty())) {
308 GeoIndexDocument geoDoc = new GeoIndexDocument(oxmModelLoader);
310 final String jsonResult = txn.getOperationResult().getResult();
312 if (jsonResult != null && jsonResult.length() > 0) {
314 populateGeoDocument(geoDoc, jsonResult, txn.getDescriptor(), txn.getLink());
316 if (!geoDoc.isValidGeoDocument()) {
318 LOG.info(AaiUiMsgs.GEO_SYNC_IGNORING_ENTITY, geoDoc.getEntityType(), geoDoc.toString());
324 link = getElasticFullUrl("/" + geoDoc.getId(), getIndexName(), "default");
325 } catch (Exception exc) {
326 LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc);
331 NetworkTransaction n2 = new NetworkTransaction();
333 n2.setEntityType(txn.getEntityType());
334 n2.setDescriptor(txn.getDescriptor());
335 n2.setOperationType(HttpMethod.PUT);
337 esWorkOnHand.incrementAndGet();
339 supplyAsync(new StoreDocumentTask(geoDoc, n2, esDataProvider), esExecutor)
340 .whenComplete((result, error) -> {
342 esWorkOnHand.decrementAndGet();
345 LOG.error(AaiUiMsgs.ES_STORE_FAILURE, error.getMessage());
347 updateElasticSearchCounters(result);
348 processStoreDocumentResult(result);
355 } catch (JsonProcessingException exc) {
356 LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
357 } catch (IOException exc) {
358 LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
366 * Process store document result.
370 private void processStoreDocumentResult(NetworkTransaction txn) {
372 OperationResult or = txn.getOperationResult();
374 if (!or.wasSuccessful()) {
375 LOG.error(AaiUiMsgs.ES_STORE_FAILURE, or.toString());
377 * if(or.getResultCode() != 404 || (or.getResultCode() == 404 &&
378 * !synchronizerConfig.isResourceNotFoundErrorsSupressed())) { logger.error(
379 * "Skipping failed resource = " + "link" + " RC=[" + or.getResultCode() + "]. Message: " +
389 public SynchronizerState getState() {
392 return SynchronizerState.PERFORMING_SYNCHRONIZATION;
395 return SynchronizerState.IDLE;
400 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
403 public String getStatReport(boolean showFinalReport) {
404 return this.getStatReport(System.currentTimeMillis() - syncStartedTimeStampInMs,
409 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
412 public void shutdown() {
413 this.shutdownExecutors();
417 * Populate geo document.
420 * @param result the result
421 * @param resultDescriptor the result descriptor
422 * @param entityLink the entity link
423 * @throws JsonProcessingException the json processing exception
424 * @throws IOException Signals that an I/O exception has occurred.
426 protected void populateGeoDocument(GeoIndexDocument doc, String result,
427 OxmEntityDescriptor resultDescriptor, String entityLink)
428 throws JsonProcessingException, IOException {
430 doc.setSelfLink(entityLink);
431 doc.setEntityType(resultDescriptor.getEntityName());
433 JsonNode entityNode = mapper.readTree(result);
435 List<String> primaryKeyValues = new ArrayList<String>();
436 String pkeyValue = null;
438 for (String keyName : resultDescriptor.getPrimaryKeyAttributeName()) {
439 pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
440 if (pkeyValue != null) {
441 primaryKeyValues.add(pkeyValue);
443 LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName());
447 final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
448 doc.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
449 String geoLatKey = resultDescriptor.getGeoLatName();
450 String geoLongKey = resultDescriptor.getGeoLongName();
452 doc.setLatitude(NodeUtils.getNodeFieldAsText(entityNode, geoLatKey));
453 doc.setLongitude(NodeUtils.getNodeFieldAsText(entityNode, geoLongKey));
459 protected boolean isSyncDone() {
460 int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
462 if (totalWorkOnHand > 0 || !allWorkEnumerated) {