a177684323eec2d369eb5770e4fccbd6430dc8dc
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / topology / sync / GeoSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.topology.sync;
24
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
26
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.Deque;
31 import java.util.Iterator;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.concurrent.ConcurrentLinkedDeque;
35 import java.util.function.Supplier;
36
37 import org.onap.aai.cl.api.Logger;
38 import org.onap.aai.cl.eelf.LoggerFactory;
39 import org.onap.aai.cl.mdc.MdcContext;
40 import org.onap.aai.restclient.client.OperationResult;
41 import org.onap.aai.sparky.config.oxm.GeoEntityLookup;
42 import org.onap.aai.sparky.config.oxm.GeoOxmEntityDescriptor;
43 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
44 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
45 import org.onap.aai.sparky.dal.NetworkTransaction;
46 import org.onap.aai.sparky.dal.rest.HttpMethod;
47 import org.onap.aai.sparky.inventory.entity.GeoIndexDocument;
48 import org.onap.aai.sparky.logging.AaiUiMsgs;
49 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
50 import org.onap.aai.sparky.sync.IndexSynchronizer;
51 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
52 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
53 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
54 import org.onap.aai.sparky.sync.enumeration.OperationState;
55 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
56 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
57 import org.onap.aai.sparky.sync.task.StoreDocumentTask;
58 import org.onap.aai.sparky.util.NodeUtils;
59 import org.slf4j.MDC;
60
61 import com.fasterxml.jackson.core.JsonProcessingException;
62 import com.fasterxml.jackson.databind.JsonNode;
63 import com.fasterxml.jackson.databind.node.ArrayNode;
64
65
66 /**
67  * The Class GeoSynchronizer.
68  */
69 public class GeoSynchronizer extends AbstractEntitySynchronizer implements IndexSynchronizer {
70
71   private static final Logger LOG = LoggerFactory.getInstance().getLogger(GeoSynchronizer.class);
72
73   private boolean allWorkEnumerated;
74   private Deque<SelfLinkDescriptor> selflinks;
75   private GeoEntityLookup geoEntityLookup;
76   private OxmEntityLookup oxmEntityLookup;
77   
78   private Map<String, GeoOxmEntityDescriptor> geoDescriptorMap = null;
79
80   /**
81    * Instantiates a new geo synchronizer.
82    *
83    * @param indexName the index name
84    * @throws Exception the exception
85    */
86   public GeoSynchronizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
87       int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
88       NetworkStatisticsConfig esStatConfig, GeoEntityLookup geoEntityLookup,
89       OxmEntityLookup oxmEntityLookup) throws Exception {
90
91     super(LOG, "GEO", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(),aaiStatConfig, esStatConfig);
92     this.geoEntityLookup = geoEntityLookup;
93     this.oxmEntityLookup = oxmEntityLookup;
94     this.allWorkEnumerated = false;
95     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
96     this.synchronizerName = "Geo Synchronizer";
97     this.geoDescriptorMap = geoEntityLookup.getGeoEntityDescriptors();
98     this.aaiEntityStats.intializeEntityCounters(geoDescriptorMap.keySet());
99     this.esEntityStats.intializeEntityCounters(geoDescriptorMap.keySet());
100     this.syncDurationInMs = -1;
101   }
102
103
104   /* (non-Javadoc)
105    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
106    */
107   @Override
108   public OperationState doSync() {
109     this.syncDurationInMs = -1;
110     resetCounters();
111     setShouldSkipSync(false);
112     allWorkEnumerated = false;
113     syncStartedTimeStampInMs = System.currentTimeMillis();
114     String txnID = NodeUtils.getRandomTxnId();
115     MdcContext.initialize(txnID, "GeoSynchronizer", "", "Sync", "");
116         
117     collectAllTheWork();
118     return OperationState.OK;
119   }
120
121
122   /**
123    * Collect all the work.
124    *
125    * @return the operation state
126    */
127   public OperationState collectAllTheWork() {
128         final Map<String,String> contextMap = MDC.getCopyOfContextMap();
129
130     if (geoDescriptorMap.isEmpty()) {
131       setShouldSkipSync(true);
132       LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "geo entities");
133       return OperationState.ERROR;
134     }
135
136     Collection<String> syncTypes = geoDescriptorMap.keySet();
137
138     try {
139
140       /*
141        * launch a parallel async thread to process the documents for each entity-type (to max the of
142        * the configured executor anyway)
143        */
144
145       aaiWorkOnHand.set(syncTypes.size());
146
147       for (String key : syncTypes) {
148
149         supplyAsync(new Supplier<Void>() {
150
151           @Override
152           public Void get() {
153                 MDC.setContextMap(contextMap);
154             OperationResult typeLinksResult = null;
155             try {
156               typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
157               aaiWorkOnHand.decrementAndGet();
158               processEntityTypeSelfLinks(typeLinksResult);
159             } catch (Exception exc) {
160               LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
161             }
162
163             return null;
164           }
165
166         }, aaiExecutor).whenComplete((result, error) -> {
167
168           if (error != null) {
169             LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
170           }
171         });
172
173       }
174
175       while (aaiWorkOnHand.get() != 0) {
176
177         if (LOG.isDebugEnabled()) {
178           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
179         }
180
181         Thread.sleep(1000);
182       }
183
184       aaiWorkOnHand.set(selflinks.size());
185       allWorkEnumerated = true;
186       syncEntityTypes();
187
188     } catch (Exception exc) {
189       LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
190     }
191     return OperationState.OK;
192   }
193
194   /**
195    * Sync entity types.
196    */
197   private void syncEntityTypes() {
198
199     while (selflinks.peek() != null) {
200
201       SelfLinkDescriptor linkDescriptor = selflinks.poll();
202       aaiWorkOnHand.decrementAndGet();
203
204       OxmEntityDescriptor descriptor = null;
205
206       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
207
208         descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
209
210         if (descriptor == null) {
211           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
212           // go to next element in iterator
213           continue;
214         }
215
216         NetworkTransaction txn = new NetworkTransaction();
217         txn.setDescriptor(descriptor);
218         txn.setLink(linkDescriptor.getSelfLink());
219         txn.setOperationType(HttpMethod.GET);
220         txn.setEntityType(linkDescriptor.getEntityType());
221
222         aaiWorkOnHand.incrementAndGet();
223
224         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
225             .whenComplete((result, error) -> {
226
227               aaiWorkOnHand.decrementAndGet();
228
229               if (error != null) {
230                 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
231               } else {
232                 if (result == null) {
233                   LOG.error(AaiUiMsgs.SELF_LINK_GET_NO_RESPONSE, linkDescriptor.getSelfLink());
234                 } else {
235                   processEntityTypeSelfLinkResult(result);
236                 }
237               }
238             });
239       }
240     }
241   }
242
243   /**
244    * Process entity type self links.
245    *
246    * @param operationResult the operation result
247    */
248   private void processEntityTypeSelfLinks(OperationResult operationResult) {
249
250     JsonNode rootNode = null;
251
252     final String jsonResult = operationResult.getResult();
253
254     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
255
256       try {
257         rootNode = mapper.readTree(jsonResult);
258       } catch (IOException exc) {
259         LOG.error(AaiUiMsgs.ERROR_GENERIC, exc);
260       }
261
262       JsonNode resultData = rootNode.get("result-data");
263       ArrayNode resultDataArrayNode = null;
264
265       if (resultData.isArray()) {
266         resultDataArrayNode = (ArrayNode) resultData;
267
268         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
269         JsonNode element = null;
270
271         while (elementIterator.hasNext()) {
272           element = elementIterator.next();
273
274           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
275           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
276
277           if (resourceType != null && resourceLink != null) {
278
279             if (geoDescriptorMap.containsKey(resourceType)) {
280               selflinks.add(new SelfLinkDescriptor(resourceLink + "?nodes-only", resourceType));
281             } else {
282               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
283               // go to next element in iterator
284               continue;
285             }
286
287           }
288         }
289       }
290     }
291
292   }
293
294   /**
295    * Process entity type self link result.
296    *
297    * @param txn the txn
298    */
299   private void processEntityTypeSelfLinkResult(NetworkTransaction txn) {
300
301     updateActiveInventoryCounters(txn);
302
303     if (!txn.getOperationResult().wasSuccessful()) {
304       return;
305     }
306     
307     GeoOxmEntityDescriptor descriptor = geoDescriptorMap.get(txn.getEntityType());
308     
309     if ( descriptor == null ) {
310       return;
311     }
312     
313     try {
314       if (descriptor.hasGeoEntity()) {
315
316         GeoIndexDocument geoDoc = new GeoIndexDocument();
317
318         final String jsonResult = txn.getOperationResult().getResult();
319
320         if (jsonResult != null && jsonResult.length() > 0) {
321
322           populateGeoDocument(geoDoc, jsonResult, txn.getDescriptor(), txn.getLink());
323
324           if (!geoDoc.isValidGeoDocument()) {
325
326             LOG.info(AaiUiMsgs.GEO_SYNC_IGNORING_ENTITY, geoDoc.getEntityType(), geoDoc.toString());
327
328           } else {
329
330             String link = null;
331             try {
332               link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), geoDoc.getId());
333             } catch (Exception exc) {
334               LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc);
335             }
336
337             if (link != null) {
338
339               NetworkTransaction n2 = new NetworkTransaction();
340               n2.setLink(link);
341               n2.setEntityType(txn.getEntityType());
342               n2.setDescriptor(txn.getDescriptor());
343               n2.setOperationType(HttpMethod.PUT);
344
345               esWorkOnHand.incrementAndGet();
346
347               supplyAsync(new StoreDocumentTask(geoDoc, n2, elasticSearchAdapter), esExecutor)
348                   .whenComplete((result, error) -> {
349
350                     esWorkOnHand.decrementAndGet();
351
352                     if (error != null) {
353                       LOG.error(AaiUiMsgs.ES_STORE_FAILURE, error.getMessage());
354                     } else {
355                       updateElasticSearchCounters(result);
356                       processStoreDocumentResult(result);
357                     }
358                   });
359             }
360           }
361         }
362       }
363     } catch (JsonProcessingException exc) {
364       LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
365     } catch (IOException exc) {
366       LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
367     }
368
369     return;
370   }
371
372
373   /**
374    * Process store document result.
375    *
376    * @param txn the txn
377    */
378   private void processStoreDocumentResult(NetworkTransaction txn) {
379
380     OperationResult or = txn.getOperationResult();
381
382     if (!or.wasSuccessful()) {
383       LOG.error(AaiUiMsgs.ES_STORE_FAILURE, or.toString());
384       /*
385        * if(or.getResultCode() != 404 || (or.getResultCode() == 404 &&
386        * !synchronizerConfig.isResourceNotFoundErrorsSupressed())) { logger.error(
387        * "Skipping failed resource = " + "link" + " RC=[" + or.getResultCode() + "]. Message: " +
388        * or.getResult()); }
389        */
390
391     }
392
393   }
394
395
396   @Override
397   public SynchronizerState getState() {
398
399     if (!isSyncDone()) {
400       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
401     }
402
403     return SynchronizerState.IDLE;
404
405   }
406
407   /* (non-Javadoc)
408    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
409    */
410   @Override
411   public String getStatReport(boolean showFinalReport) {
412     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
413     return this.getStatReport(syncDurationInMs, showFinalReport);
414   }
415
416   /* (non-Javadoc)
417    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
418    */
419   @Override
420   public void shutdown() {
421     this.shutdownExecutors();
422   }
423
424   /**
425    * Populate geo document.
426    *
427    * @param doc the doc
428    * @param result the result
429    * @param resultDescriptor the result descriptor
430    * @param entityLink the entity link
431    * @throws JsonProcessingException the json processing exception
432    * @throws IOException Signals that an I/O exception has occurred.
433    */
434   protected void populateGeoDocument(GeoIndexDocument doc, String result,
435       OxmEntityDescriptor resultDescriptor, String entityLink)
436           throws JsonProcessingException, IOException {
437
438     doc.setSelfLink(entityLink);
439     doc.setEntityType(resultDescriptor.getEntityName());
440
441     JsonNode entityNode = mapper.readTree(result);
442
443     List<String> primaryKeyValues = new ArrayList<String>();
444     String pkeyValue = null;
445
446     for (String keyName : resultDescriptor.getPrimaryKeyAttributeNames()) {
447       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
448       if (pkeyValue != null) {
449         primaryKeyValues.add(pkeyValue);
450       } else {
451         LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName());
452       }
453     }
454
455     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
456     doc.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
457     
458     GeoOxmEntityDescriptor descriptor = geoDescriptorMap.get(resultDescriptor.getEntityName());
459     
460     String geoLatKey = descriptor.getGeoLatName();
461     String geoLongKey = descriptor.getGeoLongName();
462
463     doc.setLatitude(NodeUtils.getNodeFieldAsText(entityNode, geoLatKey));
464     doc.setLongitude(NodeUtils.getNodeFieldAsText(entityNode, geoLongKey));
465     doc.deriveFields();
466
467   }
468
469   @Override
470   protected boolean isSyncDone() {
471     if (shouldSkipSync()) {
472       syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
473       return true;
474     }
475
476     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
477
478     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
479       return false;
480     }
481
482     return true;
483   }
484
485 }