Initial commit for AAI-UI(sparky-backend)
[aai/sparky-be.git] / src / main / java / org / openecomp / sparky / synchronizer / ElasticSearchIndexCleaner.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25
26 package org.openecomp.sparky.synchronizer;
27
28 import com.fasterxml.jackson.core.JsonProcessingException;
29 import com.fasterxml.jackson.databind.JsonNode;
30 import com.fasterxml.jackson.databind.ObjectMapper;
31 import com.fasterxml.jackson.databind.node.ArrayNode;
32 import com.fasterxml.jackson.databind.node.ObjectNode;
33
34 import java.io.IOException;
35 import java.util.ArrayList;
36 import java.util.Collection;
37 import java.util.Iterator;
38 import java.util.List;
39
40 import org.openecomp.sparky.dal.rest.OperationResult;
41 import org.openecomp.sparky.dal.rest.RestDataProvider;
42 import org.openecomp.sparky.synchronizer.entity.ObjectIdCollection;
43 import org.openecomp.sparky.synchronizer.entity.SearchableEntity;
44 import org.openecomp.sparky.synchronizer.enumeration.OperationState;
45 import org.openecomp.cl.api.Logger;
46 import org.openecomp.cl.eelf.LoggerFactory;
47 import org.openecomp.sparky.logging.AaiUiMsgs;
48
49 /**
50  * The Class ElasticSearchIndexCleaner.
51  */
52 public class ElasticSearchIndexCleaner implements IndexCleaner {
53
54   private static final Logger LOG =
55       LoggerFactory.getInstance().getLogger(ElasticSearchIndexCleaner.class);
56
57   private static final String BULK_OP_LINE_TEMPLATE = "%s\n";
58   private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
59
60   private ObjectIdCollection before;
61   private ObjectIdCollection after;
62
63   private String host;
64   private String port;
65
66   private String indexName;
67   private String indexType;
68   private int scrollContextTimeToLiveInMinutes;
69   private int numItemsToGetBulkRequest;
70
71   private RestDataProvider restDataProvider;
72   private ObjectMapper mapper;
73
74   /**
75    * Instantiates a new elastic search index cleaner.
76    *
77    * @param restDataProvider the rest data provider
78    * @param indexName the index name
79    * @param indexType the index type
80    * @param host the host
81    * @param port the port
82    * @param scrollContextTimeToLiveInMinutes the scroll context time to live in minutes
83    * @param numItemsToGetBulkRequest the num items to get bulk request
84    */
85   protected ElasticSearchIndexCleaner(RestDataProvider restDataProvider, String indexName,
86       String indexType, String host, String port, int scrollContextTimeToLiveInMinutes,
87       int numItemsToGetBulkRequest) {
88     this.restDataProvider = restDataProvider;
89     this.before = null;
90     this.after = null;
91     this.indexName = indexName;
92     this.indexType = indexType;
93     this.mapper = new ObjectMapper();
94     this.host = host;
95     this.port = port;
96     this.scrollContextTimeToLiveInMinutes = scrollContextTimeToLiveInMinutes;
97     this.numItemsToGetBulkRequest = numItemsToGetBulkRequest;
98   }
99
100   /* (non-Javadoc)
101    * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePreOperationCollection()
102    */
103   @Override
104   public OperationState populatePreOperationCollection() {
105
106     try {
107       before = retrieveAllDocumentIdentifiers();
108       return OperationState.OK;
109     } catch (Exception exc) {
110       LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, indexName, exc.getMessage());
111       return OperationState.ERROR;
112     }
113
114   }
115
116   /* (non-Javadoc)
117    * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePostOperationCollection()
118    */
119   @Override
120   public OperationState populatePostOperationCollection() {
121     try {
122       after = retrieveAllDocumentIdentifiers();
123       return OperationState.OK;
124     } catch (Exception exc) {
125       LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, indexName, exc.getMessage());
126       return OperationState.ERROR;
127     }
128   }
129
130   /* (non-Javadoc)
131    * @see org.openecomp.sparky.synchronizer.IndexCleaner#performCleanup()
132    */
133   @Override
134   public OperationState performCleanup() {
135     // TODO Auto-generated method stub
136     LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP, indexName);
137
138     int sizeBefore = before.getSize();
139     int sizeAfter = after.getSize();
140
141     LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP_SIZE, String.valueOf(sizeBefore),
142         String.valueOf(sizeAfter));
143
144     /*
145      * If the processedImportIds size <= 0, then something has failed in the sync operation and we
146      * shouldn't do the selective delete right now.
147      */
148
149     if (sizeAfter > 0) {
150
151       Collection<String> presyncIds = before.getImportedObjectIds();
152       presyncIds.removeAll(after.getImportedObjectIds());
153
154       try {
155         LOG.info(AaiUiMsgs.ES_SYNC_SELECTIVE_DELETE, indexName, indexType,
156             String.valueOf(presyncIds.size()));
157
158         ObjectIdCollection bulkIds = new ObjectIdCollection();
159
160         Iterator<String> it = presyncIds.iterator();
161         int numItemsInBulkRequest = 0;
162         int numItemsRemainingToBeDeleted = presyncIds.size();
163
164         while (it.hasNext()) {
165
166           bulkIds.addObjectId(it.next());
167           numItemsInBulkRequest++;
168
169           if (numItemsInBulkRequest >= this.numItemsToGetBulkRequest) {
170             LOG.info(AaiUiMsgs.ES_BULK_DELETE, indexName, String.valueOf(bulkIds.getSize()));
171             OperationResult bulkDeleteResult = bulkDelete(bulkIds.getImportedObjectIds());
172             // pegCountersForElasticBulkDelete(bulkDeleteResult);
173             numItemsRemainingToBeDeleted -= numItemsInBulkRequest;
174             numItemsInBulkRequest = 0;
175             bulkIds.clear();
176           }
177         }
178
179         if (numItemsRemainingToBeDeleted > 0) {
180           LOG.info(AaiUiMsgs.ES_BULK_DELETE, indexName, String.valueOf(bulkIds.getSize()));
181           OperationResult bulkDeleteResult = bulkDelete(bulkIds.getImportedObjectIds());
182           // pegCountersForElasticBulkDelete(bulkDeleteResult);
183         }
184
185
186       } catch (Exception exc) {
187         LOG.error(AaiUiMsgs.ES_BULK_DELETE_ERROR, indexName, exc.getLocalizedMessage());
188
189       }
190     }
191
192     return OperationState.OK;
193   }
194
195   @Override
196   public String getIndexName() {
197     return indexName;
198   }
199
200   public void setIndexName(String indexName) {
201     this.indexName = indexName;
202   }
203
204   /**
205    * Builds the initial scroll request payload.
206    *
207    * @param numItemsToGetPerRequest the num items to get per request
208    * @param fieldList the field list
209    * @return the string
210    * @throws JsonProcessingException the json processing exception
211    */
212   protected String buildInitialScrollRequestPayload(int numItemsToGetPerRequest,
213       List<String> fieldList) throws JsonProcessingException {
214
215     ObjectNode rootNode = mapper.createObjectNode();
216     rootNode.put("size", numItemsToGetPerRequest);
217
218     ArrayNode fields = mapper.createArrayNode();
219
220     for (String f : fieldList) {
221       fields.add(f);
222     }
223
224     rootNode.set("fields", fields);
225
226     ObjectNode queryNode = mapper.createObjectNode();
227     queryNode.set("match_all", mapper.createObjectNode());
228
229     rootNode.set("query", queryNode);
230
231     return mapper.writeValueAsString(rootNode);
232
233   }
234
235   /**
236    * Builds the subsequent scroll context request payload.
237    *
238    * @param scrollId the scroll id
239    * @param contextTimeToLiveInMinutes the context time to live in minutes
240    * @return the string
241    * @throws JsonProcessingException the json processing exception
242    */
243   protected String buildSubsequentScrollContextRequestPayload(String scrollId,
244       int contextTimeToLiveInMinutes) throws JsonProcessingException {
245
246     ObjectNode rootNode = mapper.createObjectNode();
247
248     rootNode.put("scroll", contextTimeToLiveInMinutes + "m");
249     rootNode.put("scroll_id", scrollId);
250
251     return mapper.writeValueAsString(rootNode);
252
253   }
254
255   /**
256    * Parses the elastic search result.
257    *
258    * @param jsonResult the json result
259    * @return the json node
260    * @throws JsonProcessingException the json processing exception
261    * @throws IOException Signals that an I/O exception has occurred.
262    */
263   protected JsonNode parseElasticSearchResult(String jsonResult)
264       throws JsonProcessingException, IOException {
265     ObjectMapper mapper = new ObjectMapper();
266     return mapper.readTree(jsonResult);
267   }
268
269   /**
270    * Lookup index doc.
271    *
272    * @param ids the ids
273    * @param docs the docs
274    * @return the array list
275    */
276   protected ArrayList<SearchableEntity> lookupIndexDoc(ArrayList<String> ids,
277       List<SearchableEntity> docs) {
278     ArrayList<SearchableEntity> objs = new ArrayList<SearchableEntity>();
279
280     if (ids != null && docs != null) {
281       for (SearchableEntity d : docs) {
282         if (ids.contains(d.getId())) {
283           objs.add(d);
284         }
285       }
286     }
287
288     return objs;
289   }
290
291   /**
292    * Builds the delete data object.
293    *
294    * @param index the index
295    * @param type the type
296    * @param id the id
297    * @return the object node
298    */
299   protected ObjectNode buildDeleteDataObject(String index, String type, String id) {
300
301     ObjectNode indexDocProperties = mapper.createObjectNode();
302
303     indexDocProperties.put("_index", index);
304     indexDocProperties.put("_type", type);
305     indexDocProperties.put("_id", id);
306
307     ObjectNode rootNode = mapper.createObjectNode();
308     rootNode.set("delete", indexDocProperties);
309
310     return rootNode;
311   }
312
313   /**
314    * This method might appear to be a little strange, and is simply an optimization to take an
315    * elipsed JsonNode key path and retrieve the node at the end of the path, if it exists.
316    *
317    * @param startNode the start node
318    * @param fieldPath the field path
319    * @return the node path
320    */
321   protected JsonNode getNodePath(JsonNode startNode, String... fieldPath) {
322
323     JsonNode jsonNode = null;
324
325     for (String field : fieldPath) {
326       if (jsonNode == null) {
327         jsonNode = startNode.get(field);
328       } else {
329         jsonNode = jsonNode.get(field);
330       }
331
332       /*
333        * This is our safety net in case any intermediate path returns a null
334        */
335
336       if (jsonNode == null) {
337         return null;
338       }
339
340     }
341
342     return jsonNode;
343   }
344
345   /**
346    * Gets the full url.
347    *
348    * @param resourceUrl the resource url
349    * @return the full url
350    */
351   private String getFullUrl(String resourceUrl) {
352     return String.format("http://%s:%s%s", host, port, resourceUrl);
353   }
354
355   /**
356    * Retrieve all document identifiers.
357    *
358    * @return the object id collection
359    * @throws IOException Signals that an I/O exception has occurred.
360    */
361   public ObjectIdCollection retrieveAllDocumentIdentifiers() throws IOException {
362
363     ObjectIdCollection currentDocumentIds = new ObjectIdCollection();
364
365     long opStartTimeInMs = System.currentTimeMillis();
366
367     List<String> fields = new ArrayList<String>();
368     fields.add("_id");
369     // fields.add("entityType");
370
371     String scrollRequestPayload =
372         buildInitialScrollRequestPayload(this.numItemsToGetBulkRequest, fields);
373
374     final String fullUrlStr = getFullUrl("/" + indexName + "/" + indexType + "/_search?scroll="
375         + this.scrollContextTimeToLiveInMinutes + "m");
376
377     OperationResult result =
378         restDataProvider.doPost(fullUrlStr, scrollRequestPayload, "application/json");
379
380     if (result.wasSuccessful()) {
381
382       JsonNode rootNode = parseElasticSearchResult(result.getResult());
383
384       /*
385        * Check the result for success / failure, and enumerate all the index ids that resulted in
386        * success, and ignore the ones that failed or log them so we have a record of the failure.
387        */
388       int totalRecordsAvailable = 0;
389       String scrollId = null;
390       int numRecordsFetched = 0;
391
392       if (rootNode != null) {
393
394         scrollId = getFieldValue(rootNode, "_scroll_id");
395         final String tookStr = getFieldValue(rootNode, "took");
396         int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
397         boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
398
399         if (timedOut) {
400           LOG.error(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "all document Identifiers",
401               String.valueOf(tookInMs));
402         } else {
403           LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "all document Identifiers",
404               String.valueOf(tookInMs));
405         }
406
407         JsonNode hitsNode = rootNode.get("hits");
408         totalRecordsAvailable = Integer.parseInt(hitsNode.get("total").asText());
409
410         LOG.info(AaiUiMsgs.COLLECT_TOTAL, "all document Identifiers",
411             String.valueOf(totalRecordsAvailable));
412
413         /*
414          * Collect all object ids
415          */
416
417         ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
418
419         Iterator<JsonNode> nodeIterator = hitsArray.iterator();
420
421         String key = null;
422         String value = null;
423         JsonNode jsonNode = null;
424
425         while (nodeIterator.hasNext()) {
426
427           jsonNode = nodeIterator.next();
428
429           key = getFieldValue(jsonNode, "_id");
430
431           if (key != null) {
432             currentDocumentIds.addObjectId(key);
433           }
434
435           /*
436            * if (key != null) {
437            * 
438            * JsonNode fieldsNode = jNode.get("fields");
439            * 
440            * if (fieldsNode != null) {
441            * 
442            * JsonNode entityTypeNode = fieldsNode.get("entityType");
443            * 
444            * if (entityTypeNode != null) { ArrayNode aNode = (ArrayNode) entityTypeNode;
445            * 
446            * if (aNode.size() > 0) { value = aNode.get(0).asText(); objAndtTypesMap.put(key, value);
447            * numRecordsFetched++; } } } }
448            */
449
450         }
451
452         int totalRecordsRemainingToFetch = (totalRecordsAvailable - numRecordsFetched);
453
454         int numRequiredAdditionalFetches =
455             (totalRecordsRemainingToFetch / this.numItemsToGetBulkRequest);
456
457         /*
458          * Do an additional fetch for the remaining items (if needed)
459          */
460
461         if (totalRecordsRemainingToFetch % numItemsToGetBulkRequest != 0) {
462           numRequiredAdditionalFetches += 1;
463         }
464
465         if (LOG.isDebugEnabled()) {
466           LOG.debug(AaiUiMsgs.SYNC_NUMBER_REQ_FETCHES,
467               String.valueOf(numRequiredAdditionalFetches));
468         }
469
470
471         for (int x = 0; x < numRequiredAdditionalFetches; x++) {
472
473           if (collectItemsFromScrollContext(scrollId, currentDocumentIds) != OperationState.OK) {
474             // abort the whole thing because now we can't reliably cleanup the orphans.
475             throw new IOException(
476                 "Failed to collect pre-sync doc collection from index.  Aborting operation");
477           }
478           if (LOG.isDebugEnabled()) {
479             LOG.debug(AaiUiMsgs.SYNC_NUMBER_TOTAL_FETCHES,
480                 String.valueOf(currentDocumentIds.getSize()),
481                 String.valueOf(totalRecordsAvailable));
482           }
483
484         }
485
486       }
487
488     } else {
489       // scroll context get failed, nothing else to do
490       LOG.error(AaiUiMsgs.ERROR_GENERIC, result.toString());
491     }
492
493     LOG.info(AaiUiMsgs.COLLECT_TOTAL_TIME, "all document Identifiers",
494         String.valueOf((System.currentTimeMillis() - opStartTimeInMs)));
495
496     return currentDocumentIds;
497
498   }
499
500   /**
501    * Collect items from scroll context.
502    *
503    * @param scrollId the scroll id
504    * @param objectIds the object ids
505    * @return the operation state
506    * @throws IOException Signals that an I/O exception has occurred.
507    */
508   private OperationState collectItemsFromScrollContext(String scrollId,
509       ObjectIdCollection objectIds) throws IOException {
510
511     // ObjectIdCollection documentIdCollection = new ObjectIdCollection();
512
513     String requestPayload =
514         buildSubsequentScrollContextRequestPayload(scrollId, scrollContextTimeToLiveInMinutes);
515
516     final String fullUrlStr = getFullUrl("/_search/scroll");
517
518     OperationResult opResult =
519         restDataProvider.doPost(fullUrlStr, requestPayload, "application/json");
520
521     if (opResult.getResultCode() >= 300) {
522       LOG.warn(AaiUiMsgs.ES_SCROLL_CONTEXT_ERROR, opResult.getResult());
523       return OperationState.ERROR;
524     }
525
526     JsonNode rootNode = parseElasticSearchResult(opResult.getResult());
527     boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
528     final String tookStr = getFieldValue(rootNode, "took");
529     int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
530
531     JsonNode hitsNode = rootNode.get("hits");
532
533     /*
534      * Check the result for success / failure, and enumerate all the index ids that resulted in
535      * success, and ignore the ones that failed or log them so we have a record of the failure.
536      */
537
538     if (rootNode != null) {
539
540       if (timedOut) {
541         LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "Scroll Context", String.valueOf(tookInMs));
542       } else {
543         LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "Scroll Context", String.valueOf(tookInMs));
544       }
545
546       /*
547        * Collect all object ids
548        */
549
550       ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
551       String key = null;
552       String value = null;
553       JsonNode jsonNode = null;
554
555       Iterator<JsonNode> nodeIterator = hitsArray.iterator();
556
557       while (nodeIterator.hasNext()) {
558
559         jsonNode = nodeIterator.next();
560
561         key = getFieldValue(jsonNode, "_id");
562
563         if (key != null) {
564           objectIds.addObjectId(key);
565
566           /*
567            * JsonNode fieldsNode = jNode.get("fields");
568            * 
569            * if (fieldsNode != null) {
570            * 
571            * JsonNode entityTypeNode = fieldsNode.get("entityType");
572            * 
573            * if (entityTypeNode != null) { ArrayNode aNode = (ArrayNode) entityTypeNode;
574            * 
575            * if (aNode.size() > 0) { value = aNode.get(0).asText(); objectIdsAndTypes.put(key,
576            * value); } } } }
577            */
578
579         }
580
581       }
582     }
583
584     return OperationState.OK;
585   }
586
587   /**
588    * Gets the field value.
589    *
590    * @param node the node
591    * @param fieldName the field name
592    * @return the field value
593    */
594   protected String getFieldValue(JsonNode node, String fieldName) {
595
596     JsonNode field = node.get(fieldName);
597
598     if (field != null) {
599       return field.asText();
600     }
601
602     return null;
603
604   }
605
606   /**
607    * Bulk delete.
608    *
609    * @param docIds the doc ids
610    * @return the operation result
611    * @throws IOException Signals that an I/O exception has occurred.
612    */
613   public OperationResult bulkDelete(Collection<String> docIds) throws IOException {
614
615     if (docIds == null || docIds.size() == 0) {
616       LOG.info(AaiUiMsgs.ES_BULK_DELETE_SKIP);
617       return new OperationResult(500,
618           "Skipping bulkDelete(); operation because docs to delete list is empty");
619     }
620
621     LOG.info(AaiUiMsgs.ES_BULK_DELETE_START, String.valueOf(docIds.size()));
622
623     StringBuilder sb = new StringBuilder(128);
624
625     for (String id : docIds) {
626       sb.append(
627           String.format(BULK_OP_LINE_TEMPLATE, buildDeleteDataObject(indexName, indexType, id)));
628     }
629
630     sb.append("\n");
631
632     final String fullUrlStr = getFullUrl("/_bulk");
633
634     return restDataProvider.doPost(fullUrlStr, sb.toString(), "application/x-www-form-urlencoded");
635
636   }
637
638   /*
639   
640   */
641
642 }