Initial commit for AAI-UI(sparky-backend)
[aai/sparky-be.git] / src / main / java / org / openecomp / sparky / synchronizer / GeoSynchronizer.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25
26 package org.openecomp.sparky.synchronizer;
27
28 import static java.util.concurrent.CompletableFuture.supplyAsync;
29
30 import java.io.IOException;
31 import java.net.InetAddress;
32 import java.net.UnknownHostException;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.Deque;
36 import java.util.Iterator;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.concurrent.ConcurrentLinkedDeque;
40 import java.util.function.Supplier;
41
42 import org.openecomp.cl.api.Logger;
43 import org.openecomp.cl.eelf.LoggerFactory;
44 import org.openecomp.sparky.config.oxm.OxmEntityDescriptor;
45 import org.openecomp.sparky.dal.NetworkTransaction;
46 import org.openecomp.sparky.dal.elasticsearch.config.ElasticSearchConfig;
47 import org.openecomp.sparky.dal.rest.HttpMethod;
48 import org.openecomp.sparky.dal.rest.OperationResult;
49 import org.openecomp.sparky.inventory.entity.GeoIndexDocument;
50 import org.openecomp.sparky.logging.AaiUiMsgs;
51 import org.openecomp.sparky.synchronizer.entity.SelfLinkDescriptor;
52 import org.openecomp.sparky.synchronizer.enumeration.OperationState;
53 import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
54 import org.openecomp.sparky.synchronizer.task.PerformActiveInventoryRetrieval;
55 import org.openecomp.sparky.synchronizer.task.StoreDocumentTask;
56 import org.openecomp.sparky.util.NodeUtils;
57 import org.slf4j.MDC;
58
59 import org.openecomp.cl.mdc.MdcContext;
60 import com.fasterxml.jackson.core.JsonProcessingException;
61 import com.fasterxml.jackson.databind.JsonNode;
62 import com.fasterxml.jackson.databind.node.ArrayNode;
63
64
65 /**
66  * The Class GeoSynchronizer.
67  */
68 public class GeoSynchronizer extends AbstractEntitySynchronizer implements IndexSynchronizer {
69
70   private static final Logger LOG = LoggerFactory.getInstance().getLogger(GeoSynchronizer.class);
71
72   private boolean allWorkEnumerated;
73   private Deque<SelfLinkDescriptor> selflinks;
74
75   private ElasticSearchConfig elasticConfig = null;
76   private Map<String, OxmEntityDescriptor> geoDescriptorMap = null;
77
78   /**
79    * Instantiates a new geo synchronizer.
80    *
81    * @param indexName the index name
82    * @throws Exception the exception
83    */
84   public GeoSynchronizer(String indexName) throws Exception {
85
86     super(LOG, "GEO", 2, 5, 5, indexName);
87     this.allWorkEnumerated = false;
88     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
89     this.synchronizerName = "Geo Synchronizer";
90     this.geoDescriptorMap = oxmModelLoader.getGeoEntityDescriptors();
91     this.aaiEntityStats.initializeCountersFromOxmEntityDescriptors(geoDescriptorMap);
92     this.esEntityStats.initializeCountersFromOxmEntityDescriptors(geoDescriptorMap);
93
94   }
95
96
97   /* (non-Javadoc)
98    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
99    */
100   @Override
101   public OperationState doSync() {
102     resetCounters();
103     allWorkEnumerated = false;
104     syncStartedTimeStampInMs = System.currentTimeMillis();
105     String txnID = NodeUtils.getRandomTxnId();
106     MdcContext.initialize(txnID, "GeoSynchronizer", "", "Sync", "");
107         
108     collectAllTheWork();
109     return OperationState.OK;
110   }
111
112
113   /**
114    * Collect all the work.
115    *
116    * @return the operation state
117    */
118   public OperationState collectAllTheWork() {
119         final Map<String,String> contextMap = MDC.getCopyOfContextMap();
120     if (elasticConfig == null) {
121       try {
122         elasticConfig = ElasticSearchConfig.getConfig();
123       } catch (Exception exc) {
124         LOG.error(AaiUiMsgs.CONFIGURATION_ERROR, "Search");
125       }
126     }
127
128     if (geoDescriptorMap.isEmpty()) {
129       LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "geo entities");
130       return OperationState.ERROR;
131     }
132
133     Collection<String> syncTypes = geoDescriptorMap.keySet();
134
135     try {
136
137       /*
138        * launch a parallel async thread to process the documents for each entity-type (to max the of
139        * the configured executor anyway)
140        */
141
142       aaiWorkOnHand.set(syncTypes.size());
143
144       for (String key : syncTypes) {
145
146         supplyAsync(new Supplier<Void>() {
147
148           @Override
149           public Void get() {
150                 MDC.setContextMap(contextMap);
151             OperationResult typeLinksResult = null;
152             try {
153               typeLinksResult = aaiDataProvider.getSelfLinksByEntityType(key);
154               aaiWorkOnHand.decrementAndGet();
155               processEntityTypeSelfLinks(typeLinksResult);
156             } catch (Exception exc) {
157               LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
158             }
159
160             return null;
161           }
162
163         }, aaiExecutor).whenComplete((result, error) -> {
164
165           if (error != null) {
166             LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
167           }
168         });
169
170       }
171
172       while (aaiWorkOnHand.get() != 0) {
173
174         if (LOG.isDebugEnabled()) {
175           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
176         }
177
178         Thread.sleep(1000);
179       }
180
181       aaiWorkOnHand.set(selflinks.size());
182       allWorkEnumerated = true;
183       syncEntityTypes();
184
185     } catch (Exception exc) {
186       LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
187     }
188     return OperationState.OK;
189   }
190
191   /**
192    * Sync entity types.
193    */
194   private void syncEntityTypes() {
195
196     while (selflinks.peek() != null) {
197
198       SelfLinkDescriptor linkDescriptor = selflinks.poll();
199       aaiWorkOnHand.decrementAndGet();
200
201       OxmEntityDescriptor descriptor = null;
202
203       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
204
205         descriptor = oxmModelLoader.getEntityDescriptor(linkDescriptor.getEntityType());
206
207         if (descriptor == null) {
208           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
209           // go to next element in iterator
210           continue;
211         }
212
213         NetworkTransaction txn = new NetworkTransaction();
214         txn.setDescriptor(descriptor);
215         txn.setLink(linkDescriptor.getSelfLink());
216         txn.setOperationType(HttpMethod.GET);
217         txn.setEntityType(linkDescriptor.getEntityType());
218
219         aaiWorkOnHand.incrementAndGet();
220
221         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiDataProvider), aaiExecutor)
222             .whenComplete((result, error) -> {
223
224               aaiWorkOnHand.decrementAndGet();
225
226               if (error != null) {
227                 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
228               } else {
229                 if (result == null) {
230                   LOG.error(AaiUiMsgs.SELF_LINK_GET_NO_RESPONSE, linkDescriptor.getSelfLink());
231                 } else {
232                   processEntityTypeSelfLinkResult(result);
233                 }
234               }
235             });
236       }
237     }
238   }
239
240   /**
241    * Process entity type self links.
242    *
243    * @param operationResult the operation result
244    */
245   private void processEntityTypeSelfLinks(OperationResult operationResult) {
246
247     JsonNode rootNode = null;
248
249     final String jsonResult = operationResult.getResult();
250
251     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
252
253       try {
254         rootNode = mapper.readTree(jsonResult);
255       } catch (IOException exc) {
256         LOG.error(AaiUiMsgs.ERROR_GENERIC, exc);
257       }
258
259       JsonNode resultData = rootNode.get("result-data");
260       ArrayNode resultDataArrayNode = null;
261
262       if (resultData.isArray()) {
263         resultDataArrayNode = (ArrayNode) resultData;
264
265         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
266         JsonNode element = null;
267
268         while (elementIterator.hasNext()) {
269           element = elementIterator.next();
270
271           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
272           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
273
274           if (resourceType != null && resourceLink != null) {
275
276             if (geoDescriptorMap.containsKey(resourceType)) {
277               selflinks.add(new SelfLinkDescriptor(resourceLink + "?nodes-only", resourceType));
278             } else {
279               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
280               // go to next element in iterator
281               continue;
282             }
283
284           }
285         }
286       }
287     }
288
289   }
290
291   /**
292    * Process entity type self link result.
293    *
294    * @param txn the txn
295    */
296   private void processEntityTypeSelfLinkResult(NetworkTransaction txn) {
297
298     updateActiveInventoryCounters(txn);
299
300     if (!txn.getOperationResult().wasSuccessful()) {
301       return;
302     }
303
304     try {
305       if (!(txn.getDescriptor().getGeoLatName().isEmpty()
306           && txn.getDescriptor().getGeoLongName().isEmpty())) {
307
308         GeoIndexDocument geoDoc = new GeoIndexDocument(oxmModelLoader);
309
310         final String jsonResult = txn.getOperationResult().getResult();
311
312         if (jsonResult != null && jsonResult.length() > 0) {
313
314           populateGeoDocument(geoDoc, jsonResult, txn.getDescriptor(), txn.getLink());
315
316           if (!geoDoc.isValidGeoDocument()) {
317
318             LOG.info(AaiUiMsgs.GEO_SYNC_IGNORING_ENTITY, geoDoc.getEntityType(), geoDoc.toString());
319
320           } else {
321
322             String link = null;
323             try {
324               link = getElasticFullUrl("/" + geoDoc.getId(), getIndexName(), "default");
325             } catch (Exception exc) {
326               LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc);
327             }
328
329             if (link != null) {
330
331               NetworkTransaction n2 = new NetworkTransaction();
332               n2.setLink(link);
333               n2.setEntityType(txn.getEntityType());
334               n2.setDescriptor(txn.getDescriptor());
335               n2.setOperationType(HttpMethod.PUT);
336
337               esWorkOnHand.incrementAndGet();
338
339               supplyAsync(new StoreDocumentTask(geoDoc, n2, esDataProvider), esExecutor)
340                   .whenComplete((result, error) -> {
341
342                     esWorkOnHand.decrementAndGet();
343
344                     if (error != null) {
345                       LOG.error(AaiUiMsgs.ES_STORE_FAILURE, error.getMessage());
346                     } else {
347                       updateElasticSearchCounters(result);
348                       processStoreDocumentResult(result);
349                     }
350                   });
351             }
352           }
353         }
354       }
355     } catch (JsonProcessingException exc) {
356       LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
357     } catch (IOException exc) {
358       LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
359     }
360
361     return;
362   }
363
364
365   /**
366    * Process store document result.
367    *
368    * @param txn the txn
369    */
370   private void processStoreDocumentResult(NetworkTransaction txn) {
371
372     OperationResult or = txn.getOperationResult();
373
374     if (!or.wasSuccessful()) {
375       LOG.error(AaiUiMsgs.ES_STORE_FAILURE, or.toString());
376       /*
377        * if(or.getResultCode() != 404 || (or.getResultCode() == 404 &&
378        * !synchronizerConfig.isResourceNotFoundErrorsSupressed())) { logger.error(
379        * "Skipping failed resource = " + "link" + " RC=[" + or.getResultCode() + "]. Message: " +
380        * or.getResult()); }
381        */
382
383     }
384
385   }
386
387
388   @Override
389   public SynchronizerState getState() {
390
391     if (!isSyncDone()) {
392       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
393     }
394
395     return SynchronizerState.IDLE;
396
397   }
398
399   /* (non-Javadoc)
400    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
401    */
402   @Override
403   public String getStatReport(boolean showFinalReport) {
404     return this.getStatReport(System.currentTimeMillis() - syncStartedTimeStampInMs,
405         showFinalReport);
406   }
407
408   /* (non-Javadoc)
409    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
410    */
411   @Override
412   public void shutdown() {
413     this.shutdownExecutors();
414   }
415
416   /**
417    * Populate geo document.
418    *
419    * @param doc the doc
420    * @param result the result
421    * @param resultDescriptor the result descriptor
422    * @param entityLink the entity link
423    * @throws JsonProcessingException the json processing exception
424    * @throws IOException Signals that an I/O exception has occurred.
425    */
426   protected void populateGeoDocument(GeoIndexDocument doc, String result,
427       OxmEntityDescriptor resultDescriptor, String entityLink)
428           throws JsonProcessingException, IOException {
429
430     doc.setSelfLink(entityLink);
431     doc.setEntityType(resultDescriptor.getEntityName());
432
433     JsonNode entityNode = mapper.readTree(result);
434
435     List<String> primaryKeyValues = new ArrayList<String>();
436     String pkeyValue = null;
437
438     for (String keyName : resultDescriptor.getPrimaryKeyAttributeName()) {
439       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
440       if (pkeyValue != null) {
441         primaryKeyValues.add(pkeyValue);
442       } else {
443         LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName());
444       }
445     }
446
447     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
448     doc.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
449     String geoLatKey = resultDescriptor.getGeoLatName();
450     String geoLongKey = resultDescriptor.getGeoLongName();
451
452     doc.setLatitude(NodeUtils.getNodeFieldAsText(entityNode, geoLatKey));
453     doc.setLongitude(NodeUtils.getNodeFieldAsText(entityNode, geoLongKey));
454     doc.deriveFields();
455
456   }
457
458   @Override
459   protected boolean isSyncDone() {
460     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
461
462     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
463       return false;
464     }
465
466     return true;
467   }
468
469 }