Convert Sparky to Spring-Boot
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / topology / sync / GeoSynchronizer.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25 package org.onap.aai.sparky.topology.sync;
26
27 import static java.util.concurrent.CompletableFuture.supplyAsync;
28
29 import java.io.IOException;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Deque;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.concurrent.ConcurrentLinkedDeque;
37 import java.util.function.Supplier;
38
39 import org.onap.aai.cl.api.Logger;
40 import org.onap.aai.cl.eelf.LoggerFactory;
41 import org.onap.aai.cl.mdc.MdcContext;
42 import org.onap.aai.restclient.client.OperationResult;
43 import org.onap.aai.sparky.config.oxm.GeoEntityLookup;
44 import org.onap.aai.sparky.config.oxm.GeoOxmEntityDescriptor;
45 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
46 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
47 import org.onap.aai.sparky.dal.NetworkTransaction;
48 import org.onap.aai.sparky.dal.rest.HttpMethod;
49 import org.onap.aai.sparky.inventory.entity.GeoIndexDocument;
50 import org.onap.aai.sparky.logging.AaiUiMsgs;
51 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
52 import org.onap.aai.sparky.sync.IndexSynchronizer;
53 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
54 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
55 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
56 import org.onap.aai.sparky.sync.enumeration.OperationState;
57 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
58 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
59 import org.onap.aai.sparky.sync.task.StoreDocumentTask;
60 import org.onap.aai.sparky.util.NodeUtils;
61 import org.slf4j.MDC;
62
63 import com.fasterxml.jackson.core.JsonProcessingException;
64 import com.fasterxml.jackson.databind.JsonNode;
65 import com.fasterxml.jackson.databind.node.ArrayNode;
66
67
68 /**
69  * The Class GeoSynchronizer.
70  */
71 public class GeoSynchronizer extends AbstractEntitySynchronizer implements IndexSynchronizer {
72
73   private static final Logger LOG = LoggerFactory.getInstance().getLogger(GeoSynchronizer.class);
74
75   private boolean allWorkEnumerated;
76   private Deque<SelfLinkDescriptor> selflinks;
77   private GeoEntityLookup geoEntityLookup;
78   private OxmEntityLookup oxmEntityLookup;
79   
80   private Map<String, GeoOxmEntityDescriptor> geoDescriptorMap = null;
81
82   /**
83    * Instantiates a new geo synchronizer.
84    *
85    * @param indexName the index name
86    * @throws Exception the exception
87    */
88   public GeoSynchronizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
89       int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
90       NetworkStatisticsConfig esStatConfig, GeoEntityLookup geoEntityLookup,
91       OxmEntityLookup oxmEntityLookup) throws Exception {
92
93     super(LOG, "GEO", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(),aaiStatConfig, esStatConfig);
94     this.geoEntityLookup = geoEntityLookup;
95     this.oxmEntityLookup = oxmEntityLookup;
96     this.allWorkEnumerated = false;
97     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
98     this.synchronizerName = "Geo Synchronizer";
99     this.geoDescriptorMap = geoEntityLookup.getGeoEntityDescriptors();
100     this.aaiEntityStats.intializeEntityCounters(geoDescriptorMap.keySet());
101     this.esEntityStats.intializeEntityCounters(geoDescriptorMap.keySet());
102     this.syncDurationInMs = -1;
103   }
104
105
106   /* (non-Javadoc)
107    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
108    */
109   @Override
110   public OperationState doSync() {
111     this.syncDurationInMs = -1;
112     resetCounters();
113     setShouldSkipSync(false);
114     allWorkEnumerated = false;
115     syncStartedTimeStampInMs = System.currentTimeMillis();
116     String txnID = NodeUtils.getRandomTxnId();
117     MdcContext.initialize(txnID, "GeoSynchronizer", "", "Sync", "");
118         
119     collectAllTheWork();
120     return OperationState.OK;
121   }
122
123
124   /**
125    * Collect all the work.
126    *
127    * @return the operation state
128    */
129   public OperationState collectAllTheWork() {
130         final Map<String,String> contextMap = MDC.getCopyOfContextMap();
131
132     if (geoDescriptorMap.isEmpty()) {
133       setShouldSkipSync(true);
134       LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "geo entities");
135       return OperationState.ERROR;
136     }
137
138     Collection<String> syncTypes = geoDescriptorMap.keySet();
139
140     try {
141
142       /*
143        * launch a parallel async thread to process the documents for each entity-type (to max the of
144        * the configured executor anyway)
145        */
146
147       aaiWorkOnHand.set(syncTypes.size());
148
149       for (String key : syncTypes) {
150
151         supplyAsync(new Supplier<Void>() {
152
153           @Override
154           public Void get() {
155                 MDC.setContextMap(contextMap);
156             OperationResult typeLinksResult = null;
157             try {
158               typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
159               aaiWorkOnHand.decrementAndGet();
160               processEntityTypeSelfLinks(typeLinksResult);
161             } catch (Exception exc) {
162               LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
163             }
164
165             return null;
166           }
167
168         }, aaiExecutor).whenComplete((result, error) -> {
169
170           if (error != null) {
171             LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
172           }
173         });
174
175       }
176
177       while (aaiWorkOnHand.get() != 0) {
178
179         if (LOG.isDebugEnabled()) {
180           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
181         }
182
183         Thread.sleep(1000);
184       }
185
186       aaiWorkOnHand.set(selflinks.size());
187       allWorkEnumerated = true;
188       syncEntityTypes();
189
190     } catch (Exception exc) {
191       LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
192     }
193     return OperationState.OK;
194   }
195
196   /**
197    * Sync entity types.
198    */
199   private void syncEntityTypes() {
200
201     while (selflinks.peek() != null) {
202
203       SelfLinkDescriptor linkDescriptor = selflinks.poll();
204       aaiWorkOnHand.decrementAndGet();
205
206       OxmEntityDescriptor descriptor = null;
207
208       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
209
210         descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
211
212         if (descriptor == null) {
213           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
214           // go to next element in iterator
215           continue;
216         }
217
218         NetworkTransaction txn = new NetworkTransaction();
219         txn.setDescriptor(descriptor);
220         txn.setLink(linkDescriptor.getSelfLink());
221         txn.setOperationType(HttpMethod.GET);
222         txn.setEntityType(linkDescriptor.getEntityType());
223
224         aaiWorkOnHand.incrementAndGet();
225
226         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
227             .whenComplete((result, error) -> {
228
229               aaiWorkOnHand.decrementAndGet();
230
231               if (error != null) {
232                 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
233               } else {
234                 if (result == null) {
235                   LOG.error(AaiUiMsgs.SELF_LINK_GET_NO_RESPONSE, linkDescriptor.getSelfLink());
236                 } else {
237                   processEntityTypeSelfLinkResult(result);
238                 }
239               }
240             });
241       }
242     }
243   }
244
245   /**
246    * Process entity type self links.
247    *
248    * @param operationResult the operation result
249    */
250   private void processEntityTypeSelfLinks(OperationResult operationResult) {
251
252     JsonNode rootNode = null;
253
254     final String jsonResult = operationResult.getResult();
255
256     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
257
258       try {
259         rootNode = mapper.readTree(jsonResult);
260       } catch (IOException exc) {
261         LOG.error(AaiUiMsgs.ERROR_GENERIC, exc);
262       }
263
264       JsonNode resultData = rootNode.get("result-data");
265       ArrayNode resultDataArrayNode = null;
266
267       if (resultData.isArray()) {
268         resultDataArrayNode = (ArrayNode) resultData;
269
270         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
271         JsonNode element = null;
272
273         while (elementIterator.hasNext()) {
274           element = elementIterator.next();
275
276           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
277           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
278
279           if (resourceType != null && resourceLink != null) {
280
281             if (geoDescriptorMap.containsKey(resourceType)) {
282               selflinks.add(new SelfLinkDescriptor(resourceLink + "?nodes-only", resourceType));
283             } else {
284               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
285               // go to next element in iterator
286               continue;
287             }
288
289           }
290         }
291       }
292     }
293
294   }
295
296   /**
297    * Process entity type self link result.
298    *
299    * @param txn the txn
300    */
301   private void processEntityTypeSelfLinkResult(NetworkTransaction txn) {
302
303     updateActiveInventoryCounters(txn);
304
305     if (!txn.getOperationResult().wasSuccessful()) {
306       return;
307     }
308     
309     GeoOxmEntityDescriptor descriptor = geoDescriptorMap.get(txn.getEntityType());
310     
311     if ( descriptor == null ) {
312       return;
313     }
314     
315     try {
316       if (descriptor.hasGeoEntity()) {
317
318         GeoIndexDocument geoDoc = new GeoIndexDocument();
319
320         final String jsonResult = txn.getOperationResult().getResult();
321
322         if (jsonResult != null && jsonResult.length() > 0) {
323
324           populateGeoDocument(geoDoc, jsonResult, txn.getDescriptor(), txn.getLink());
325
326           if (!geoDoc.isValidGeoDocument()) {
327
328             LOG.info(AaiUiMsgs.GEO_SYNC_IGNORING_ENTITY, geoDoc.getEntityType(), geoDoc.toString());
329
330           } else {
331
332             String link = null;
333             try {
334               link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), geoDoc.getId());
335             } catch (Exception exc) {
336               LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc);
337             }
338
339             if (link != null) {
340
341               NetworkTransaction n2 = new NetworkTransaction();
342               n2.setLink(link);
343               n2.setEntityType(txn.getEntityType());
344               n2.setDescriptor(txn.getDescriptor());
345               n2.setOperationType(HttpMethod.PUT);
346
347               esWorkOnHand.incrementAndGet();
348
349               supplyAsync(new StoreDocumentTask(geoDoc, n2, elasticSearchAdapter), esExecutor)
350                   .whenComplete((result, error) -> {
351
352                     esWorkOnHand.decrementAndGet();
353
354                     if (error != null) {
355                       LOG.error(AaiUiMsgs.ES_STORE_FAILURE, error.getMessage());
356                     } else {
357                       updateElasticSearchCounters(result);
358                       processStoreDocumentResult(result);
359                     }
360                   });
361             }
362           }
363         }
364       }
365     } catch (JsonProcessingException exc) {
366       LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
367     } catch (IOException exc) {
368       LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
369     }
370
371     return;
372   }
373
374
375   /**
376    * Process store document result.
377    *
378    * @param txn the txn
379    */
380   private void processStoreDocumentResult(NetworkTransaction txn) {
381
382     OperationResult or = txn.getOperationResult();
383
384     if (!or.wasSuccessful()) {
385       LOG.error(AaiUiMsgs.ES_STORE_FAILURE, or.toString());
386       /*
387        * if(or.getResultCode() != 404 || (or.getResultCode() == 404 &&
388        * !synchronizerConfig.isResourceNotFoundErrorsSupressed())) { logger.error(
389        * "Skipping failed resource = " + "link" + " RC=[" + or.getResultCode() + "]. Message: " +
390        * or.getResult()); }
391        */
392
393     }
394
395   }
396
397
398   @Override
399   public SynchronizerState getState() {
400
401     if (!isSyncDone()) {
402       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
403     }
404
405     return SynchronizerState.IDLE;
406
407   }
408
409   /* (non-Javadoc)
410    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
411    */
412   @Override
413   public String getStatReport(boolean showFinalReport) {
414     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
415     return this.getStatReport(syncDurationInMs, showFinalReport);
416   }
417
418   /* (non-Javadoc)
419    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
420    */
421   @Override
422   public void shutdown() {
423     this.shutdownExecutors();
424   }
425
426   /**
427    * Populate geo document.
428    *
429    * @param doc the doc
430    * @param result the result
431    * @param resultDescriptor the result descriptor
432    * @param entityLink the entity link
433    * @throws JsonProcessingException the json processing exception
434    * @throws IOException Signals that an I/O exception has occurred.
435    */
436   protected void populateGeoDocument(GeoIndexDocument doc, String result,
437       OxmEntityDescriptor resultDescriptor, String entityLink)
438           throws JsonProcessingException, IOException {
439
440     doc.setSelfLink(entityLink);
441     doc.setEntityType(resultDescriptor.getEntityName());
442
443     JsonNode entityNode = mapper.readTree(result);
444
445     List<String> primaryKeyValues = new ArrayList<String>();
446     String pkeyValue = null;
447
448     for (String keyName : resultDescriptor.getPrimaryKeyAttributeNames()) {
449       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
450       if (pkeyValue != null) {
451         primaryKeyValues.add(pkeyValue);
452       } else {
453         LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName());
454       }
455     }
456
457     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
458     doc.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
459     
460     GeoOxmEntityDescriptor descriptor = geoDescriptorMap.get(resultDescriptor.getEntityName());
461     
462     String geoLatKey = descriptor.getGeoLatName();
463     String geoLongKey = descriptor.getGeoLongName();
464
465     doc.setLatitude(NodeUtils.getNodeFieldAsText(entityNode, geoLatKey));
466     doc.setLongitude(NodeUtils.getNodeFieldAsText(entityNode, geoLongKey));
467     doc.deriveFields();
468
469   }
470
471   @Override
472   protected boolean isSyncDone() {
473     if (shouldSkipSync()) {
474       syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
475       return true;
476     }
477
478     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
479
480     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
481       return false;
482     }
483
484     return true;
485   }
486
487 }