Fix potential null pointer places
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / topology / sync / GeoSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sparky.topology.sync;
22
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Deque;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.ConcurrentLinkedDeque;
33 import java.util.function.Supplier;
34
35 import org.onap.aai.cl.api.Logger;
36 import org.onap.aai.cl.eelf.LoggerFactory;
37 import org.onap.aai.cl.mdc.MdcContext;
38 import org.onap.aai.restclient.client.OperationResult;
39 import org.onap.aai.sparky.config.oxm.GeoEntityLookup;
40 import org.onap.aai.sparky.config.oxm.GeoOxmEntityDescriptor;
41 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
42 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
43 import org.onap.aai.sparky.dal.NetworkTransaction;
44 import org.onap.aai.sparky.dal.rest.HttpMethod;
45 import org.onap.aai.sparky.inventory.entity.GeoIndexDocument;
46 import org.onap.aai.sparky.logging.AaiUiMsgs;
47 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
48 import org.onap.aai.sparky.sync.IndexSynchronizer;
49 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
50 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
51 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
52 import org.onap.aai.sparky.sync.enumeration.OperationState;
53 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
54 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
55 import org.onap.aai.sparky.sync.task.StoreDocumentTask;
56 import org.onap.aai.sparky.util.NodeUtils;
57 import org.slf4j.MDC;
58
59 import com.fasterxml.jackson.core.JsonProcessingException;
60 import com.fasterxml.jackson.databind.JsonNode;
61 import com.fasterxml.jackson.databind.node.ArrayNode;
62
63
64 /**
65  * The Class GeoSynchronizer.
66  */
67 public class GeoSynchronizer extends AbstractEntitySynchronizer implements IndexSynchronizer {
68
69   private static final Logger LOG = LoggerFactory.getInstance().getLogger(GeoSynchronizer.class);
70
71   private boolean allWorkEnumerated;
72   private Deque<SelfLinkDescriptor> selflinks;
73   private GeoEntityLookup geoEntityLookup;
74   private OxmEntityLookup oxmEntityLookup;
75   
76   private Map<String, GeoOxmEntityDescriptor> geoDescriptorMap = null;
77
78   /**
79    * Instantiates a new geo synchronizer.
80    *
81    * @throws Exception the exception
82    */
83   public GeoSynchronizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
84       int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
85       NetworkStatisticsConfig esStatConfig, GeoEntityLookup geoEntityLookup,
86       OxmEntityLookup oxmEntityLookup) throws Exception {
87
88     super(LOG, "GEO", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(),aaiStatConfig, esStatConfig);
89     this.geoEntityLookup = geoEntityLookup;
90     this.oxmEntityLookup = oxmEntityLookup;
91     this.allWorkEnumerated = false;
92     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
93     this.synchronizerName = "Geo Synchronizer";
94     this.geoDescriptorMap = geoEntityLookup.getGeoEntityDescriptors();
95     this.aaiEntityStats.intializeEntityCounters(geoDescriptorMap.keySet());
96     this.esEntityStats.intializeEntityCounters(geoDescriptorMap.keySet());
97     this.syncDurationInMs = -1;
98   }
99
100
101   /* (non-Javadoc)
102    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
103    */
104   @Override
105   public OperationState doSync() {
106     this.syncDurationInMs = -1;
107     resetCounters();
108     setShouldSkipSync(false);
109     allWorkEnumerated = false;
110     syncStartedTimeStampInMs = System.currentTimeMillis();
111     String txnID = NodeUtils.getRandomTxnId();
112     MdcContext.initialize(txnID, "GeoSynchronizer", "", "Sync", "");
113         
114     collectAllTheWork();
115     return OperationState.OK;
116   }
117
118
119   /**
120    * Collect all the work.
121    *
122    * @return the operation state
123    */
124   public OperationState collectAllTheWork() {
125         final Map<String,String> contextMap = MDC.getCopyOfContextMap();
126
127     if (geoDescriptorMap.isEmpty()) {
128       setShouldSkipSync(true);
129       LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "geo entities");
130       return OperationState.ERROR;
131     }
132
133     Collection<String> syncTypes = geoDescriptorMap.keySet();
134
135     try {
136
137       /*
138        * launch a parallel async thread to process the documents for each entity-type (to max the of
139        * the configured executor anyway)
140        */
141
142       aaiWorkOnHand.set(syncTypes.size());
143
144       for (String key : syncTypes) {
145
146         supplyAsync(new Supplier<Void>() {
147
148           @Override
149           public Void get() {
150                 MDC.setContextMap(contextMap);
151             OperationResult typeLinksResult = null;
152             try {
153               typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
154               aaiWorkOnHand.decrementAndGet();
155               processEntityTypeSelfLinks(typeLinksResult);
156             } catch (Exception exc) {
157               LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
158             }
159
160             return null;
161           }
162
163         }, aaiExecutor).whenComplete((result, error) -> {
164
165           if (error != null) {
166             LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
167           }
168         });
169
170       }
171
172       while (aaiWorkOnHand.get() != 0) {
173
174         if (LOG.isDebugEnabled()) {
175           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
176         }
177
178         Thread.sleep(1000);
179       }
180
181       aaiWorkOnHand.set(selflinks.size());
182       allWorkEnumerated = true;
183       syncEntityTypes();
184
185     } catch (Exception exc) {
186       LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
187     }
188     return OperationState.OK;
189   }
190
191   /**
192    * Sync entity types.
193    */
194   private void syncEntityTypes() {
195
196     while (selflinks.peek() != null) {
197
198       SelfLinkDescriptor linkDescriptor = selflinks.poll();
199       aaiWorkOnHand.decrementAndGet();
200
201       OxmEntityDescriptor descriptor = null;
202
203       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
204
205         descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
206
207         if (descriptor == null) {
208           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
209           // go to next element in iterator
210           continue;
211         }
212
213         NetworkTransaction txn = new NetworkTransaction();
214         txn.setDescriptor(descriptor);
215         txn.setLink(linkDescriptor.getSelfLink());
216         txn.setOperationType(HttpMethod.GET);
217         txn.setEntityType(linkDescriptor.getEntityType());
218
219         aaiWorkOnHand.incrementAndGet();
220
221         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
222             .whenComplete((result, error) -> {
223
224               aaiWorkOnHand.decrementAndGet();
225
226               if (error != null) {
227                 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
228               } else {
229                 if (result == null) {
230                   LOG.error(AaiUiMsgs.SELF_LINK_GET_NO_RESPONSE, linkDescriptor.getSelfLink());
231                 } else {
232                   processEntityTypeSelfLinkResult(result);
233                 }
234               }
235             });
236       }
237     }
238   }
239
240   /**
241    * Process entity type self links.
242    *
243    * @param operationResult the operation result
244    */
245   private void processEntityTypeSelfLinks(OperationResult operationResult) {
246
247     final String jsonResult = operationResult.getResult();
248
249     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
250
251       try {
252         JsonNode rootNode = mapper.readTree(jsonResult);
253         JsonNode resultData = rootNode.get("result-data");
254
255         if (resultData.isArray()) {
256           ArrayNode resultDataArrayNode = (ArrayNode) resultData;
257
258           Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
259
260           while (elementIterator.hasNext()) {
261             JsonNode element = elementIterator.next();
262
263             final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
264             final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
265
266             if (resourceType != null && resourceLink != null) {
267
268               if (geoDescriptorMap.containsKey(resourceType)) {
269                 selflinks.add(new SelfLinkDescriptor(resourceLink + "?nodes-only", resourceType));
270               } else {
271                 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
272                 // go to next element in iterator
273                 continue;
274               }
275
276             }
277           }
278         }
279       } catch (IOException exc) {
280         LOG.error(AaiUiMsgs.ERROR_GENERIC, exc);
281       }
282     }
283
284   }
285
286   /**
287    * Process entity type self link result.
288    *
289    * @param txn the txn
290    */
291   private void processEntityTypeSelfLinkResult(NetworkTransaction txn) {
292
293     updateActiveInventoryCounters(txn);
294
295     if (!txn.getOperationResult().wasSuccessful()) {
296       return;
297     }
298     
299     GeoOxmEntityDescriptor descriptor = geoDescriptorMap.get(txn.getEntityType());
300     
301     if ( descriptor == null ) {
302       return;
303     }
304     
305     try {
306       if (descriptor.hasGeoEntity()) {
307
308         GeoIndexDocument geoDoc = new GeoIndexDocument();
309
310         final String jsonResult = txn.getOperationResult().getResult();
311
312         if (jsonResult != null && jsonResult.length() > 0) {
313
314           populateGeoDocument(geoDoc, jsonResult, txn.getDescriptor(), txn.getLink());
315
316           if (!geoDoc.isValidGeoDocument()) {
317
318             LOG.info(AaiUiMsgs.GEO_SYNC_IGNORING_ENTITY, geoDoc.getEntityType(), geoDoc.toString());
319
320           } else {
321
322             String link = null;
323             try {
324               link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), geoDoc.getId());
325             } catch (Exception exc) {
326               LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc);
327             }
328
329             if (link != null) {
330
331               NetworkTransaction n2 = new NetworkTransaction();
332               n2.setLink(link);
333               n2.setEntityType(txn.getEntityType());
334               n2.setDescriptor(txn.getDescriptor());
335               n2.setOperationType(HttpMethod.PUT);
336
337               esWorkOnHand.incrementAndGet();
338
339               supplyAsync(new StoreDocumentTask(geoDoc, n2, elasticSearchAdapter), esExecutor)
340                   .whenComplete((result, error) -> {
341
342                     esWorkOnHand.decrementAndGet();
343
344                     if (error != null) {
345                       LOG.error(AaiUiMsgs.ES_STORE_FAILURE, error.getMessage());
346                     } else {
347                       updateElasticSearchCounters(result);
348                       processStoreDocumentResult(result);
349                     }
350                   });
351             }
352           }
353         }
354       }
355     } catch (JsonProcessingException exc) {
356       LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
357     } catch (IOException exc) {
358       LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
359     }
360
361     return;
362   }
363
364
365   /**
366    * Process store document result.
367    *
368    * @param txn the txn
369    */
370   private void processStoreDocumentResult(NetworkTransaction txn) {
371
372     OperationResult or = txn.getOperationResult();
373
374     if (!or.wasSuccessful()) {
375       LOG.error(AaiUiMsgs.ES_STORE_FAILURE, or.toString());
376       /*
377        * if(or.getResultCode() != 404 || (or.getResultCode() == 404 &&
378        * !synchronizerConfig.isResourceNotFoundErrorsSupressed())) { logger.error(
379        * "Skipping failed resource = " + "link" + " RC=[" + or.getResultCode() + "]. Message: " +
380        * or.getResult()); }
381        */
382
383     }
384
385   }
386
387
388   @Override
389   public SynchronizerState getState() {
390
391     if (!isSyncDone()) {
392       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
393     }
394
395     return SynchronizerState.IDLE;
396
397   }
398
399   /* (non-Javadoc)
400    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
401    */
402   @Override
403   public String getStatReport(boolean showFinalReport) {
404     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
405     return this.getStatReport(syncDurationInMs, showFinalReport);
406   }
407
408   /* (non-Javadoc)
409    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
410    */
411   @Override
412   public void shutdown() {
413     this.shutdownExecutors();
414   }
415
416   /**
417    * Populate geo document.
418    *
419    * @param doc the doc
420    * @param result the result
421    * @param resultDescriptor the result descriptor
422    * @param entityLink the entity link
423    * @throws JsonProcessingException the json processing exception
424    * @throws IOException Signals that an I/O exception has occurred.
425    */
426   protected void populateGeoDocument(GeoIndexDocument doc, String result,
427       OxmEntityDescriptor resultDescriptor, String entityLink)
428           throws JsonProcessingException, IOException {
429
430     doc.setSelfLink(entityLink);
431     doc.setEntityType(resultDescriptor.getEntityName());
432
433     JsonNode entityNode = mapper.readTree(result);
434
435     List<String> primaryKeyValues = new ArrayList<String>();
436     String pkeyValue = null;
437
438     for (String keyName : resultDescriptor.getPrimaryKeyAttributeNames()) {
439       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
440       if (pkeyValue != null) {
441         primaryKeyValues.add(pkeyValue);
442       } else {
443         LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName());
444       }
445     }
446
447     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
448     doc.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
449     
450     GeoOxmEntityDescriptor descriptor = geoDescriptorMap.get(resultDescriptor.getEntityName());
451     
452     String geoLatKey = descriptor.getGeoLatName();
453     String geoLongKey = descriptor.getGeoLongName();
454
455     doc.setLatitude(NodeUtils.getNodeFieldAsText(entityNode, geoLatKey));
456     doc.setLongitude(NodeUtils.getNodeFieldAsText(entityNode, geoLongKey));
457     doc.deriveFields();
458
459   }
460
461   @Override
462   protected boolean isSyncDone() {
463     if (shouldSkipSync()) {
464       syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
465       return true;
466     }
467
468     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
469
470     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
471       return false;
472     }
473
474     return true;
475   }
476
477 }