Update license and poms
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / topology / sync / GeoSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sparky.topology.sync;
22
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Deque;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.ConcurrentLinkedDeque;
33 import java.util.function.Supplier;
34
35 import org.onap.aai.cl.api.Logger;
36 import org.onap.aai.cl.eelf.LoggerFactory;
37 import org.onap.aai.cl.mdc.MdcContext;
38 import org.onap.aai.restclient.client.OperationResult;
39 import org.onap.aai.sparky.config.oxm.GeoEntityLookup;
40 import org.onap.aai.sparky.config.oxm.GeoOxmEntityDescriptor;
41 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
42 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
43 import org.onap.aai.sparky.dal.NetworkTransaction;
44 import org.onap.aai.sparky.dal.rest.HttpMethod;
45 import org.onap.aai.sparky.inventory.entity.GeoIndexDocument;
46 import org.onap.aai.sparky.logging.AaiUiMsgs;
47 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
48 import org.onap.aai.sparky.sync.IndexSynchronizer;
49 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
50 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
51 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
52 import org.onap.aai.sparky.sync.enumeration.OperationState;
53 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
54 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
55 import org.onap.aai.sparky.sync.task.StoreDocumentTask;
56 import org.onap.aai.sparky.util.NodeUtils;
57 import org.slf4j.MDC;
58
59 import com.fasterxml.jackson.core.JsonProcessingException;
60 import com.fasterxml.jackson.databind.JsonNode;
61 import com.fasterxml.jackson.databind.node.ArrayNode;
62
63
64 /**
65  * The Class GeoSynchronizer.
66  */
67 public class GeoSynchronizer extends AbstractEntitySynchronizer implements IndexSynchronizer {
68
69   private static final Logger LOG = LoggerFactory.getInstance().getLogger(GeoSynchronizer.class);
70
71   private boolean allWorkEnumerated;
72   private Deque<SelfLinkDescriptor> selflinks;
73   private GeoEntityLookup geoEntityLookup;
74   private OxmEntityLookup oxmEntityLookup;
75   
76   private Map<String, GeoOxmEntityDescriptor> geoDescriptorMap = null;
77
78   /**
79    * Instantiates a new geo synchronizer.
80    *
81    * @param indexName the index name
82    * @throws Exception the exception
83    */
84   public GeoSynchronizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
85       int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
86       NetworkStatisticsConfig esStatConfig, GeoEntityLookup geoEntityLookup,
87       OxmEntityLookup oxmEntityLookup) throws Exception {
88
89     super(LOG, "GEO", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(),aaiStatConfig, esStatConfig);
90     this.geoEntityLookup = geoEntityLookup;
91     this.oxmEntityLookup = oxmEntityLookup;
92     this.allWorkEnumerated = false;
93     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
94     this.synchronizerName = "Geo Synchronizer";
95     this.geoDescriptorMap = geoEntityLookup.getGeoEntityDescriptors();
96     this.aaiEntityStats.intializeEntityCounters(geoDescriptorMap.keySet());
97     this.esEntityStats.intializeEntityCounters(geoDescriptorMap.keySet());
98     this.syncDurationInMs = -1;
99   }
100
101
102   /* (non-Javadoc)
103    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
104    */
105   @Override
106   public OperationState doSync() {
107     this.syncDurationInMs = -1;
108     resetCounters();
109     setShouldSkipSync(false);
110     allWorkEnumerated = false;
111     syncStartedTimeStampInMs = System.currentTimeMillis();
112     String txnID = NodeUtils.getRandomTxnId();
113     MdcContext.initialize(txnID, "GeoSynchronizer", "", "Sync", "");
114         
115     collectAllTheWork();
116     return OperationState.OK;
117   }
118
119
120   /**
121    * Collect all the work.
122    *
123    * @return the operation state
124    */
125   public OperationState collectAllTheWork() {
126         final Map<String,String> contextMap = MDC.getCopyOfContextMap();
127
128     if (geoDescriptorMap.isEmpty()) {
129       setShouldSkipSync(true);
130       LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "geo entities");
131       return OperationState.ERROR;
132     }
133
134     Collection<String> syncTypes = geoDescriptorMap.keySet();
135
136     try {
137
138       /*
139        * launch a parallel async thread to process the documents for each entity-type (to max the of
140        * the configured executor anyway)
141        */
142
143       aaiWorkOnHand.set(syncTypes.size());
144
145       for (String key : syncTypes) {
146
147         supplyAsync(new Supplier<Void>() {
148
149           @Override
150           public Void get() {
151                 MDC.setContextMap(contextMap);
152             OperationResult typeLinksResult = null;
153             try {
154               typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
155               aaiWorkOnHand.decrementAndGet();
156               processEntityTypeSelfLinks(typeLinksResult);
157             } catch (Exception exc) {
158               LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
159             }
160
161             return null;
162           }
163
164         }, aaiExecutor).whenComplete((result, error) -> {
165
166           if (error != null) {
167             LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
168           }
169         });
170
171       }
172
173       while (aaiWorkOnHand.get() != 0) {
174
175         if (LOG.isDebugEnabled()) {
176           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
177         }
178
179         Thread.sleep(1000);
180       }
181
182       aaiWorkOnHand.set(selflinks.size());
183       allWorkEnumerated = true;
184       syncEntityTypes();
185
186     } catch (Exception exc) {
187       LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
188     }
189     return OperationState.OK;
190   }
191
192   /**
193    * Sync entity types.
194    */
195   private void syncEntityTypes() {
196
197     while (selflinks.peek() != null) {
198
199       SelfLinkDescriptor linkDescriptor = selflinks.poll();
200       aaiWorkOnHand.decrementAndGet();
201
202       OxmEntityDescriptor descriptor = null;
203
204       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
205
206         descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
207
208         if (descriptor == null) {
209           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
210           // go to next element in iterator
211           continue;
212         }
213
214         NetworkTransaction txn = new NetworkTransaction();
215         txn.setDescriptor(descriptor);
216         txn.setLink(linkDescriptor.getSelfLink());
217         txn.setOperationType(HttpMethod.GET);
218         txn.setEntityType(linkDescriptor.getEntityType());
219
220         aaiWorkOnHand.incrementAndGet();
221
222         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
223             .whenComplete((result, error) -> {
224
225               aaiWorkOnHand.decrementAndGet();
226
227               if (error != null) {
228                 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
229               } else {
230                 if (result == null) {
231                   LOG.error(AaiUiMsgs.SELF_LINK_GET_NO_RESPONSE, linkDescriptor.getSelfLink());
232                 } else {
233                   processEntityTypeSelfLinkResult(result);
234                 }
235               }
236             });
237       }
238     }
239   }
240
241   /**
242    * Process entity type self links.
243    *
244    * @param operationResult the operation result
245    */
246   private void processEntityTypeSelfLinks(OperationResult operationResult) {
247
248     JsonNode rootNode = null;
249
250     final String jsonResult = operationResult.getResult();
251
252     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
253
254       try {
255         rootNode = mapper.readTree(jsonResult);
256       } catch (IOException exc) {
257         LOG.error(AaiUiMsgs.ERROR_GENERIC, exc);
258       }
259
260       JsonNode resultData = rootNode.get("result-data");
261       ArrayNode resultDataArrayNode = null;
262
263       if (resultData.isArray()) {
264         resultDataArrayNode = (ArrayNode) resultData;
265
266         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
267         JsonNode element = null;
268
269         while (elementIterator.hasNext()) {
270           element = elementIterator.next();
271
272           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
273           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
274
275           if (resourceType != null && resourceLink != null) {
276
277             if (geoDescriptorMap.containsKey(resourceType)) {
278               selflinks.add(new SelfLinkDescriptor(resourceLink + "?nodes-only", resourceType));
279             } else {
280               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
281               // go to next element in iterator
282               continue;
283             }
284
285           }
286         }
287       }
288     }
289
290   }
291
292   /**
293    * Process entity type self link result.
294    *
295    * @param txn the txn
296    */
297   private void processEntityTypeSelfLinkResult(NetworkTransaction txn) {
298
299     updateActiveInventoryCounters(txn);
300
301     if (!txn.getOperationResult().wasSuccessful()) {
302       return;
303     }
304     
305     GeoOxmEntityDescriptor descriptor = geoDescriptorMap.get(txn.getEntityType());
306     
307     if ( descriptor == null ) {
308       return;
309     }
310     
311     try {
312       if (descriptor.hasGeoEntity()) {
313
314         GeoIndexDocument geoDoc = new GeoIndexDocument();
315
316         final String jsonResult = txn.getOperationResult().getResult();
317
318         if (jsonResult != null && jsonResult.length() > 0) {
319
320           populateGeoDocument(geoDoc, jsonResult, txn.getDescriptor(), txn.getLink());
321
322           if (!geoDoc.isValidGeoDocument()) {
323
324             LOG.info(AaiUiMsgs.GEO_SYNC_IGNORING_ENTITY, geoDoc.getEntityType(), geoDoc.toString());
325
326           } else {
327
328             String link = null;
329             try {
330               link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), geoDoc.getId());
331             } catch (Exception exc) {
332               LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc);
333             }
334
335             if (link != null) {
336
337               NetworkTransaction n2 = new NetworkTransaction();
338               n2.setLink(link);
339               n2.setEntityType(txn.getEntityType());
340               n2.setDescriptor(txn.getDescriptor());
341               n2.setOperationType(HttpMethod.PUT);
342
343               esWorkOnHand.incrementAndGet();
344
345               supplyAsync(new StoreDocumentTask(geoDoc, n2, elasticSearchAdapter), esExecutor)
346                   .whenComplete((result, error) -> {
347
348                     esWorkOnHand.decrementAndGet();
349
350                     if (error != null) {
351                       LOG.error(AaiUiMsgs.ES_STORE_FAILURE, error.getMessage());
352                     } else {
353                       updateElasticSearchCounters(result);
354                       processStoreDocumentResult(result);
355                     }
356                   });
357             }
358           }
359         }
360       }
361     } catch (JsonProcessingException exc) {
362       LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
363     } catch (IOException exc) {
364       LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
365     }
366
367     return;
368   }
369
370
371   /**
372    * Process store document result.
373    *
374    * @param txn the txn
375    */
376   private void processStoreDocumentResult(NetworkTransaction txn) {
377
378     OperationResult or = txn.getOperationResult();
379
380     if (!or.wasSuccessful()) {
381       LOG.error(AaiUiMsgs.ES_STORE_FAILURE, or.toString());
382       /*
383        * if(or.getResultCode() != 404 || (or.getResultCode() == 404 &&
384        * !synchronizerConfig.isResourceNotFoundErrorsSupressed())) { logger.error(
385        * "Skipping failed resource = " + "link" + " RC=[" + or.getResultCode() + "]. Message: " +
386        * or.getResult()); }
387        */
388
389     }
390
391   }
392
393
394   @Override
395   public SynchronizerState getState() {
396
397     if (!isSyncDone()) {
398       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
399     }
400
401     return SynchronizerState.IDLE;
402
403   }
404
405   /* (non-Javadoc)
406    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
407    */
408   @Override
409   public String getStatReport(boolean showFinalReport) {
410     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
411     return this.getStatReport(syncDurationInMs, showFinalReport);
412   }
413
414   /* (non-Javadoc)
415    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
416    */
417   @Override
418   public void shutdown() {
419     this.shutdownExecutors();
420   }
421
422   /**
423    * Populate geo document.
424    *
425    * @param doc the doc
426    * @param result the result
427    * @param resultDescriptor the result descriptor
428    * @param entityLink the entity link
429    * @throws JsonProcessingException the json processing exception
430    * @throws IOException Signals that an I/O exception has occurred.
431    */
432   protected void populateGeoDocument(GeoIndexDocument doc, String result,
433       OxmEntityDescriptor resultDescriptor, String entityLink)
434           throws JsonProcessingException, IOException {
435
436     doc.setSelfLink(entityLink);
437     doc.setEntityType(resultDescriptor.getEntityName());
438
439     JsonNode entityNode = mapper.readTree(result);
440
441     List<String> primaryKeyValues = new ArrayList<String>();
442     String pkeyValue = null;
443
444     for (String keyName : resultDescriptor.getPrimaryKeyAttributeNames()) {
445       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
446       if (pkeyValue != null) {
447         primaryKeyValues.add(pkeyValue);
448       } else {
449         LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName());
450       }
451     }
452
453     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
454     doc.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
455     
456     GeoOxmEntityDescriptor descriptor = geoDescriptorMap.get(resultDescriptor.getEntityName());
457     
458     String geoLatKey = descriptor.getGeoLatName();
459     String geoLongKey = descriptor.getGeoLongName();
460
461     doc.setLatitude(NodeUtils.getNodeFieldAsText(entityNode, geoLatKey));
462     doc.setLongitude(NodeUtils.getNodeFieldAsText(entityNode, geoLongKey));
463     doc.deriveFields();
464
465   }
466
467   @Override
468   protected boolean isSyncDone() {
469     if (shouldSkipSync()) {
470       syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
471       return true;
472     }
473
474     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
475
476     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
477       return false;
478     }
479
480     return true;
481   }
482
483 }