Adding UI extensibility
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / topology / sync / GeoSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.topology.sync;
24
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
26
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.Deque;
31 import java.util.Iterator;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.concurrent.ConcurrentLinkedDeque;
35 import java.util.function.Supplier;
36
37 import org.onap.aai.cl.api.Logger;
38 import org.onap.aai.cl.eelf.LoggerFactory;
39 import org.onap.aai.cl.mdc.MdcContext;
40 import org.onap.aai.restclient.client.OperationResult;
41 import org.onap.aai.sparky.config.oxm.GeoEntityLookup;
42 import org.onap.aai.sparky.config.oxm.GeoOxmEntityDescriptor;
43 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
44 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
45 import org.onap.aai.sparky.dal.NetworkTransaction;
46 import org.onap.aai.sparky.dal.elasticsearch.config.ElasticSearchConfig;
47 import org.onap.aai.sparky.dal.rest.HttpMethod;
48 import org.onap.aai.sparky.inventory.entity.GeoIndexDocument;
49 import org.onap.aai.sparky.logging.AaiUiMsgs;
50 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
51 import org.onap.aai.sparky.sync.IndexSynchronizer;
52 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
53 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
54 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
55 import org.onap.aai.sparky.sync.enumeration.OperationState;
56 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
57 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
58 import org.onap.aai.sparky.sync.task.StoreDocumentTask;
59 import org.onap.aai.sparky.util.NodeUtils;
60 import org.slf4j.MDC;
61
62 import com.fasterxml.jackson.core.JsonProcessingException;
63 import com.fasterxml.jackson.databind.JsonNode;
64 import com.fasterxml.jackson.databind.node.ArrayNode;
65
66
67 /**
68  * The Class GeoSynchronizer.
69  */
70 public class GeoSynchronizer extends AbstractEntitySynchronizer implements IndexSynchronizer {
71
72   private static final Logger LOG = LoggerFactory.getInstance().getLogger(GeoSynchronizer.class);
73
74   private boolean allWorkEnumerated;
75   private Deque<SelfLinkDescriptor> selflinks;
76
77   private ElasticSearchConfig elasticConfig = null;
78   private Map<String, GeoOxmEntityDescriptor> geoDescriptorMap = null;
79
80   /**
81    * Instantiates a new geo synchronizer.
82    *
83    * @param indexName the index name
84    * @throws Exception the exception
85    */
86   public GeoSynchronizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
87       int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
88       NetworkStatisticsConfig esStatConfig) throws Exception {
89
90     super(LOG, "GEO", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(),
91         aaiStatConfig, esStatConfig);
92     this.allWorkEnumerated = false;
93     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
94     this.synchronizerName = "Geo Synchronizer";
95     this.geoDescriptorMap = GeoEntityLookup.getInstance().getGeoEntityDescriptors();
96     this.aaiEntityStats.intializeEntityCounters(geoDescriptorMap.keySet());
97     this.esEntityStats.intializeEntityCounters(geoDescriptorMap.keySet());
98     this.syncDurationInMs = -1;
99   }
100
101
102   /*
103    * (non-Javadoc)
104    * 
105    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
106    */
107   @Override
108   public OperationState doSync() {
109     this.syncDurationInMs = -1;
110     resetCounters();
111     setShouldSkipSync(false);
112     allWorkEnumerated = false;
113     syncStartedTimeStampInMs = System.currentTimeMillis();
114     String txnID = NodeUtils.getRandomTxnId();
115     MdcContext.initialize(txnID, "GeoSynchronizer", "", "Sync", "");
116
117     collectAllTheWork();
118     return OperationState.OK;
119   }
120
121
122   /**
123    * Collect all the work.
124    *
125    * @return the operation state
126    */
127   public OperationState collectAllTheWork() {
128     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
129     if (elasticConfig == null) {
130       try {
131         elasticConfig = ElasticSearchConfig.getConfig();
132       } catch (Exception exc) {
133         LOG.error(AaiUiMsgs.CONFIGURATION_ERROR, "Search");
134       }
135     }
136
137     if (geoDescriptorMap.isEmpty()) {
138       setShouldSkipSync(true);
139       LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "geo entities");
140       return OperationState.ERROR;
141     }
142
143     Collection<String> syncTypes = geoDescriptorMap.keySet();
144
145     try {
146
147       /*
148        * launch a parallel async thread to process the documents for each entity-type (to max the of
149        * the configured executor anyway)
150        */
151
152       aaiWorkOnHand.set(syncTypes.size());
153
154       for (String key : syncTypes) {
155
156         supplyAsync(new Supplier<Void>() {
157
158           @Override
159           public Void get() {
160             MDC.setContextMap(contextMap);
161             OperationResult typeLinksResult = null;
162             try {
163               typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
164               aaiWorkOnHand.decrementAndGet();
165               processEntityTypeSelfLinks(typeLinksResult);
166             } catch (Exception exc) {
167               LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
168             }
169
170             return null;
171           }
172
173         }, aaiExecutor).whenComplete((result, error) -> {
174
175           if (error != null) {
176             LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
177           }
178         });
179
180       }
181
182       while (aaiWorkOnHand.get() != 0) {
183
184         if (LOG.isDebugEnabled()) {
185           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
186         }
187
188         Thread.sleep(1000);
189       }
190
191       aaiWorkOnHand.set(selflinks.size());
192       allWorkEnumerated = true;
193       syncEntityTypes();
194
195     } catch (Exception exc) {
196       LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
197     }
198     return OperationState.OK;
199   }
200
201   /**
202    * Sync entity types.
203    */
204   private void syncEntityTypes() {
205
206     while (selflinks.peek() != null) {
207
208       SelfLinkDescriptor linkDescriptor = selflinks.poll();
209       aaiWorkOnHand.decrementAndGet();
210
211       OxmEntityDescriptor descriptor = null;
212
213       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
214
215         descriptor = OxmEntityLookup.getInstance().getEntityDescriptors()
216             .get(linkDescriptor.getEntityType());
217
218         if (descriptor == null) {
219           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
220           // go to next element in iterator
221           continue;
222         }
223
224         NetworkTransaction txn = new NetworkTransaction();
225         txn.setDescriptor(descriptor);
226         txn.setLink(linkDescriptor.getSelfLink());
227         txn.setOperationType(HttpMethod.GET);
228         txn.setEntityType(linkDescriptor.getEntityType());
229
230         aaiWorkOnHand.incrementAndGet();
231
232         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
233             .whenComplete((result, error) -> {
234
235               aaiWorkOnHand.decrementAndGet();
236
237               if (error != null) {
238                 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
239               } else {
240                 if (result == null) {
241                   LOG.error(AaiUiMsgs.SELF_LINK_GET_NO_RESPONSE, linkDescriptor.getSelfLink());
242                 } else {
243                   processEntityTypeSelfLinkResult(result);
244                 }
245               }
246             });
247       }
248     }
249   }
250
251   /**
252    * Process entity type self links.
253    *
254    * @param operationResult the operation result
255    */
256   private void processEntityTypeSelfLinks(OperationResult operationResult) {
257
258     JsonNode rootNode = null;
259
260     final String jsonResult = operationResult.getResult();
261
262     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
263
264       try {
265         rootNode = mapper.readTree(jsonResult);
266       } catch (IOException exc) {
267         LOG.error(AaiUiMsgs.ERROR_GENERIC, exc);
268       }
269
270       JsonNode resultData = rootNode.get("result-data");
271       ArrayNode resultDataArrayNode = null;
272
273       if (resultData.isArray()) {
274         resultDataArrayNode = (ArrayNode) resultData;
275
276         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
277         JsonNode element = null;
278
279         while (elementIterator.hasNext()) {
280           element = elementIterator.next();
281
282           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
283           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
284
285           if (resourceType != null && resourceLink != null) {
286
287             if (geoDescriptorMap.containsKey(resourceType)) {
288               selflinks.add(new SelfLinkDescriptor(resourceLink + "?nodes-only", resourceType));
289             } else {
290               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
291               // go to next element in iterator
292               continue;
293             }
294
295           }
296         }
297       }
298     }
299
300   }
301
302   /**
303    * Process entity type self link result.
304    *
305    * @param txn the txn
306    */
307   private void processEntityTypeSelfLinkResult(NetworkTransaction txn) {
308
309     updateActiveInventoryCounters(txn);
310
311     if (!txn.getOperationResult().wasSuccessful()) {
312       return;
313     }
314
315     GeoOxmEntityDescriptor descriptor = geoDescriptorMap.get(txn.getEntityType());
316
317     if (descriptor == null) {
318       return;
319     }
320
321     try {
322       if (descriptor.hasGeoEntity()) {
323
324         GeoIndexDocument geoDoc = new GeoIndexDocument();
325
326         final String jsonResult = txn.getOperationResult().getResult();
327
328         if (jsonResult != null && jsonResult.length() > 0) {
329
330           populateGeoDocument(geoDoc, jsonResult, txn.getDescriptor(), txn.getLink());
331
332           if (!geoDoc.isValidGeoDocument()) {
333
334             LOG.info(AaiUiMsgs.GEO_SYNC_IGNORING_ENTITY, geoDoc.getEntityType(), geoDoc.toString());
335
336           } else {
337
338             String link = null;
339             try {
340               link = getElasticFullUrl("/" + geoDoc.getId(), getIndexName(), "default");
341             } catch (Exception exc) {
342               LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc);
343             }
344
345             if (link != null) {
346
347               NetworkTransaction n2 = new NetworkTransaction();
348               n2.setLink(link);
349               n2.setEntityType(txn.getEntityType());
350               n2.setDescriptor(txn.getDescriptor());
351               n2.setOperationType(HttpMethod.PUT);
352
353               esWorkOnHand.incrementAndGet();
354
355               supplyAsync(new StoreDocumentTask(geoDoc, n2, elasticSearchAdapter), esExecutor)
356                   .whenComplete((result, error) -> {
357
358                     esWorkOnHand.decrementAndGet();
359
360                     if (error != null) {
361                       LOG.error(AaiUiMsgs.ES_STORE_FAILURE, error.getMessage());
362                     } else {
363                       updateElasticSearchCounters(result);
364                       processStoreDocumentResult(result);
365                     }
366                   });
367             }
368           }
369         }
370       }
371     } catch (JsonProcessingException exc) {
372       LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
373     } catch (IOException exc) {
374       LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
375     }
376
377     return;
378   }
379
380
381   /**
382    * Process store document result.
383    *
384    * @param txn the txn
385    */
386   private void processStoreDocumentResult(NetworkTransaction txn) {
387
388     OperationResult or = txn.getOperationResult();
389
390     if (!or.wasSuccessful()) {
391       LOG.error(AaiUiMsgs.ES_STORE_FAILURE, or.toString());
392       /*
393        * if(or.getResultCode() != 404 || (or.getResultCode() == 404 &&
394        * !synchronizerConfig.isResourceNotFoundErrorsSupressed())) { logger.error(
395        * "Skipping failed resource = " + "link" + " RC=[" + or.getResultCode() + "]. Message: " +
396        * or.getResult()); }
397        */
398
399     }
400
401   }
402
403
404   @Override
405   public SynchronizerState getState() {
406
407     if (!isSyncDone()) {
408       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
409     }
410
411     return SynchronizerState.IDLE;
412
413   }
414
415   /*
416    * (non-Javadoc)
417    * 
418    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
419    */
420   @Override
421   public String getStatReport(boolean showFinalReport) {
422     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
423     return this.getStatReport(syncDurationInMs, showFinalReport);
424   }
425
426   /*
427    * (non-Javadoc)
428    * 
429    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
430    */
431   @Override
432   public void shutdown() {
433     this.shutdownExecutors();
434   }
435
436   /**
437    * Populate geo document.
438    *
439    * @param doc the doc
440    * @param result the result
441    * @param resultDescriptor the result descriptor
442    * @param entityLink the entity link
443    * @throws JsonProcessingException the json processing exception
444    * @throws IOException Signals that an I/O exception has occurred.
445    */
446   protected void populateGeoDocument(GeoIndexDocument doc, String result,
447       OxmEntityDescriptor resultDescriptor, String entityLink)
448       throws JsonProcessingException, IOException {
449
450     doc.setSelfLink(entityLink);
451     doc.setEntityType(resultDescriptor.getEntityName());
452
453     JsonNode entityNode = mapper.readTree(result);
454
455     List<String> primaryKeyValues = new ArrayList<String>();
456     String pkeyValue = null;
457
458     for (String keyName : resultDescriptor.getPrimaryKeyAttributeNames()) {
459       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
460       if (pkeyValue != null) {
461         primaryKeyValues.add(pkeyValue);
462       } else {
463         LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName());
464       }
465     }
466
467     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
468     doc.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
469
470     GeoOxmEntityDescriptor descriptor = geoDescriptorMap.get(resultDescriptor.getEntityName());
471
472     String geoLatKey = descriptor.getGeoLatName();
473     String geoLongKey = descriptor.getGeoLongName();
474
475     doc.setLatitude(NodeUtils.getNodeFieldAsText(entityNode, geoLatKey));
476     doc.setLongitude(NodeUtils.getNodeFieldAsText(entityNode, geoLongKey));
477     doc.deriveFields();
478
479   }
480
481   @Override
482   protected boolean isSyncDone() {
483     if (shouldSkipSync()) {
484       syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
485       return true;
486     }
487
488     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
489
490     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
491       return false;
492     }
493
494     return true;
495   }
496
497 }