Merge "Fix Blocker/Critical sonar issues"
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / synchronizer / GeoSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.synchronizer;
24
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
26
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.Deque;
31 import java.util.Iterator;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.concurrent.ConcurrentLinkedDeque;
35 import java.util.function.Supplier;
36
37 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
38 import org.onap.aai.sparky.dal.NetworkTransaction;
39 import org.onap.aai.sparky.dal.elasticsearch.config.ElasticSearchConfig;
40 import org.onap.aai.sparky.dal.rest.HttpMethod;
41 import org.onap.aai.sparky.dal.rest.OperationResult;
42 import org.onap.aai.sparky.inventory.entity.GeoIndexDocument;
43 import org.onap.aai.sparky.logging.AaiUiMsgs;
44 import org.onap.aai.sparky.synchronizer.entity.SelfLinkDescriptor;
45 import org.onap.aai.sparky.synchronizer.enumeration.OperationState;
46 import org.onap.aai.sparky.synchronizer.enumeration.SynchronizerState;
47 import org.onap.aai.sparky.synchronizer.task.PerformActiveInventoryRetrieval;
48 import org.onap.aai.sparky.synchronizer.task.StoreDocumentTask;
49 import org.onap.aai.sparky.util.NodeUtils;
50 import org.onap.aai.cl.api.Logger;
51 import org.onap.aai.cl.eelf.LoggerFactory;
52 import org.onap.aai.cl.mdc.MdcContext;
53 import org.slf4j.MDC;
54
55 import com.fasterxml.jackson.core.JsonProcessingException;
56 import com.fasterxml.jackson.databind.JsonNode;
57 import com.fasterxml.jackson.databind.node.ArrayNode;
58
59
60 /**
61  * The Class GeoSynchronizer.
62  */
63 public class GeoSynchronizer extends AbstractEntitySynchronizer implements IndexSynchronizer {
64
65   private static final Logger LOG = LoggerFactory.getInstance().getLogger(GeoSynchronizer.class);
66
67   private boolean allWorkEnumerated;
68   private Deque<SelfLinkDescriptor> selflinks;
69
70   private ElasticSearchConfig elasticConfig = null;
71   private Map<String, OxmEntityDescriptor> geoDescriptorMap = null;
72
73   /**
74    * Instantiates a new geo synchronizer.
75    *
76    * @param indexName the index name
77    * @throws Exception the exception
78    */
79   public GeoSynchronizer(String indexName) throws Exception {
80
81     super(LOG, "GEO", 2, 5, 5, indexName);
82     this.allWorkEnumerated = false;
83     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
84     this.synchronizerName = "Geo Synchronizer";
85     this.geoDescriptorMap = oxmModelLoader.getGeoEntityDescriptors();
86     this.aaiEntityStats.initializeCountersFromOxmEntityDescriptors(geoDescriptorMap);
87     this.esEntityStats.initializeCountersFromOxmEntityDescriptors(geoDescriptorMap);
88     this.syncDurationInMs = -1;
89   }
90
91
92   /* (non-Javadoc)
93    * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#doSync()
94    */
95   @Override
96   public OperationState doSync() {
97     resetCounters();
98     allWorkEnumerated = false;
99     syncStartedTimeStampInMs = System.currentTimeMillis();
100     String txnID = NodeUtils.getRandomTxnId();
101     MdcContext.initialize(txnID, "GeoSynchronizer", "", "Sync", "");
102         
103     collectAllTheWork();
104     return OperationState.OK;
105   }
106
107
108   /**
109    * Collect all the work.
110    *
111    * @return the operation state
112    */
113   public OperationState collectAllTheWork() {
114         final Map<String,String> contextMap = MDC.getCopyOfContextMap();
115     if (elasticConfig == null) {
116       try {
117         elasticConfig = ElasticSearchConfig.getConfig();
118       } catch (Exception exc) {
119         LOG.error(AaiUiMsgs.CONFIGURATION_ERROR, "Search");
120       }
121     }
122
123     if (geoDescriptorMap.isEmpty()) {
124       setShouldSkipSync(true);
125       LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "geo entities");
126       return OperationState.ERROR;
127     }
128
129     Collection<String> syncTypes = geoDescriptorMap.keySet();
130
131     try {
132
133       /*
134        * launch a parallel async thread to process the documents for each entity-type (to max the of
135        * the configured executor anyway)
136        */
137
138       aaiWorkOnHand.set(syncTypes.size());
139
140       for (String key : syncTypes) {
141
142         supplyAsync(new Supplier<Void>() {
143
144           @Override
145           public Void get() {
146                 MDC.setContextMap(contextMap);
147             OperationResult typeLinksResult = null;
148             try {
149               typeLinksResult = aaiDataProvider.getSelfLinksByEntityType(key);
150               aaiWorkOnHand.decrementAndGet();
151               processEntityTypeSelfLinks(typeLinksResult);
152             } catch (Exception exc) {
153               LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
154             }
155
156             return null;
157           }
158
159         }, aaiExecutor).whenComplete((result, error) -> {
160
161           if (error != null) {
162             LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
163           }
164         });
165
166       }
167
168       while (aaiWorkOnHand.get() != 0) {
169
170         if (LOG.isDebugEnabled()) {
171           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
172         }
173
174         Thread.sleep(1000);
175       }
176
177       aaiWorkOnHand.set(selflinks.size());
178       allWorkEnumerated = true;
179       syncEntityTypes();
180
181     } catch (Exception exc) {
182       LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
183     }
184     return OperationState.OK;
185   }
186
187   /**
188    * Sync entity types.
189    */
190   private void syncEntityTypes() {
191
192     while (selflinks.peek() != null) {
193
194       SelfLinkDescriptor linkDescriptor = selflinks.poll();
195       aaiWorkOnHand.decrementAndGet();
196
197       OxmEntityDescriptor descriptor = null;
198
199       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
200
201         descriptor = oxmModelLoader.getEntityDescriptor(linkDescriptor.getEntityType());
202
203         if (descriptor == null) {
204           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
205           // go to next element in iterator
206           continue;
207         }
208
209         NetworkTransaction txn = new NetworkTransaction();
210         txn.setDescriptor(descriptor);
211         txn.setLink(linkDescriptor.getSelfLink());
212         txn.setOperationType(HttpMethod.GET);
213         txn.setEntityType(linkDescriptor.getEntityType());
214
215         aaiWorkOnHand.incrementAndGet();
216
217         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiDataProvider), aaiExecutor)
218             .whenComplete((result, error) -> {
219
220               aaiWorkOnHand.decrementAndGet();
221
222               if (error != null) {
223                 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
224               } else {
225                 if (result == null) {
226                   LOG.error(AaiUiMsgs.SELF_LINK_GET_NO_RESPONSE, linkDescriptor.getSelfLink());
227                 } else {
228                   processEntityTypeSelfLinkResult(result);
229                 }
230               }
231             });
232       }
233     }
234   }
235
236   /**
237    * Process entity type self links.
238    *
239    * @param operationResult the operation result
240    */
241   private void processEntityTypeSelfLinks(OperationResult operationResult) {
242
243     JsonNode rootNode = null;
244
245     final String jsonResult = operationResult.getResult();
246
247     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
248
249       try {
250         rootNode = mapper.readTree(jsonResult);
251       } catch (IOException exc) {
252         LOG.error(AaiUiMsgs.ERROR_GENERIC, exc);
253         return;
254       }
255
256       JsonNode resultData = rootNode.get("result-data");
257       ArrayNode resultDataArrayNode = null;
258
259       if (resultData.isArray()) {
260         resultDataArrayNode = (ArrayNode) resultData;
261
262         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
263         JsonNode element = null;
264
265         while (elementIterator.hasNext()) {
266           element = elementIterator.next();
267
268           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
269           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
270
271           if (resourceType != null && resourceLink != null) {
272
273             if (geoDescriptorMap.containsKey(resourceType)) {
274               selflinks.add(new SelfLinkDescriptor(resourceLink + "?nodes-only", resourceType));
275             } else {
276               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
277               // go to next element in iterator
278               continue;
279             }
280
281           }
282         }
283       }
284     }
285
286   }
287
288   /**
289    * Process entity type self link result.
290    *
291    * @param txn the txn
292    */
293   private void processEntityTypeSelfLinkResult(NetworkTransaction txn) {
294
295     updateActiveInventoryCounters(txn);
296
297     if (!txn.getOperationResult().wasSuccessful()) {
298       return;
299     }
300
301     try {
302       if (!(txn.getDescriptor().getGeoLatName().isEmpty()
303           && txn.getDescriptor().getGeoLongName().isEmpty())) {
304
305         GeoIndexDocument geoDoc = new GeoIndexDocument(oxmModelLoader);
306
307         final String jsonResult = txn.getOperationResult().getResult();
308
309         if (jsonResult != null && jsonResult.length() > 0) {
310
311           populateGeoDocument(geoDoc, jsonResult, txn.getDescriptor(), txn.getLink());
312
313           if (!geoDoc.isValidGeoDocument()) {
314
315             LOG.info(AaiUiMsgs.GEO_SYNC_IGNORING_ENTITY, geoDoc.getEntityType(), geoDoc.toString());
316
317           } else {
318
319             String link = null;
320             try {
321               link = getElasticFullUrl("/" + geoDoc.getId(), getIndexName(), "default");
322             } catch (Exception exc) {
323               LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc);
324             }
325
326             if (link != null) {
327
328               NetworkTransaction n2 = new NetworkTransaction();
329               n2.setLink(link);
330               n2.setEntityType(txn.getEntityType());
331               n2.setDescriptor(txn.getDescriptor());
332               n2.setOperationType(HttpMethod.PUT);
333
334               esWorkOnHand.incrementAndGet();
335
336               supplyAsync(new StoreDocumentTask(geoDoc, n2, esDataProvider), esExecutor)
337                   .whenComplete((result, error) -> {
338
339                     esWorkOnHand.decrementAndGet();
340
341                     if (error != null) {
342                       LOG.error(AaiUiMsgs.ES_STORE_FAILURE, error.getMessage());
343                     } else {
344                       updateElasticSearchCounters(result);
345                       processStoreDocumentResult(result);
346                     }
347                   });
348             }
349           }
350         }
351       }
352     } catch (JsonProcessingException exc) {
353       LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
354     } catch (IOException exc) {
355       LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
356     }
357
358     return;
359   }
360
361
362   /**
363    * Process store document result.
364    *
365    * @param txn the txn
366    */
367   private void processStoreDocumentResult(NetworkTransaction txn) {
368
369     OperationResult or = txn.getOperationResult();
370
371     if (!or.wasSuccessful()) {
372       LOG.error(AaiUiMsgs.ES_STORE_FAILURE, or.toString());
373       /*
374        * if(or.getResultCode() != 404 || (or.getResultCode() == 404 &&
375        * !synchronizerConfig.isResourceNotFoundErrorsSupressed())) { logger.error(
376        * "Skipping failed resource = " + "link" + " RC=[" + or.getResultCode() + "]. Message: " +
377        * or.getResult()); }
378        */
379
380     }
381
382   }
383
384
385   @Override
386   public SynchronizerState getState() {
387
388     if (!isSyncDone()) {
389       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
390     }
391
392     return SynchronizerState.IDLE;
393
394   }
395
396   /* (non-Javadoc)
397    * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
398    */
399   @Override
400   public String getStatReport(boolean showFinalReport) {
401           syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
402           return this.getStatReport(syncDurationInMs, showFinalReport);
403   }
404
405   /* (non-Javadoc)
406    * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#shutdown()
407    */
408   @Override
409   public void shutdown() {
410     this.shutdownExecutors();
411   }
412
413   /**
414    * Populate geo document.
415    *
416    * @param doc the doc
417    * @param result the result
418    * @param resultDescriptor the result descriptor
419    * @param entityLink the entity link
420    * @throws JsonProcessingException the json processing exception
421    * @throws IOException Signals that an I/O exception has occurred.
422    */
423   protected void populateGeoDocument(GeoIndexDocument doc, String result,
424       OxmEntityDescriptor resultDescriptor, String entityLink)
425           throws JsonProcessingException, IOException {
426
427     doc.setSelfLink(entityLink);
428     doc.setEntityType(resultDescriptor.getEntityName());
429
430     JsonNode entityNode = mapper.readTree(result);
431
432     List<String> primaryKeyValues = new ArrayList<String>();
433     String pkeyValue = null;
434
435     for (String keyName : resultDescriptor.getPrimaryKeyAttributeName()) {
436       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
437       if (pkeyValue != null) {
438         primaryKeyValues.add(pkeyValue);
439       } else {
440         LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName());
441       }
442     }
443
444     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
445     doc.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
446     String geoLatKey = resultDescriptor.getGeoLatName();
447     String geoLongKey = resultDescriptor.getGeoLongName();
448
449     doc.setLatitude(NodeUtils.getNodeFieldAsText(entityNode, geoLatKey));
450     doc.setLongitude(NodeUtils.getNodeFieldAsText(entityNode, geoLongKey));
451     doc.deriveFields();
452
453   }
454
455   @Override
456   protected boolean isSyncDone() {
457     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
458
459     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
460       return false;
461     }
462
463     return true;
464   }
465
466 }