Changing the license and trademark
[aai/sparky-be.git] / src / main / java / org / openecomp / sparky / synchronizer / ElasticSearchIndexCleaner.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.openecomp.sparky.synchronizer;
24
25 import com.fasterxml.jackson.core.JsonProcessingException;
26 import com.fasterxml.jackson.databind.JsonNode;
27 import com.fasterxml.jackson.databind.ObjectMapper;
28 import com.fasterxml.jackson.databind.node.ArrayNode;
29 import com.fasterxml.jackson.databind.node.ObjectNode;
30
31 import java.io.IOException;
32 import java.util.ArrayList;
33 import java.util.Collection;
34 import java.util.Iterator;
35 import java.util.List;
36
37 import org.openecomp.sparky.dal.rest.OperationResult;
38 import org.openecomp.sparky.dal.rest.RestDataProvider;
39 import org.openecomp.sparky.synchronizer.entity.ObjectIdCollection;
40 import org.openecomp.sparky.synchronizer.entity.SearchableEntity;
41 import org.openecomp.sparky.synchronizer.enumeration.OperationState;
42 import org.openecomp.cl.api.Logger;
43 import org.openecomp.cl.eelf.LoggerFactory;
44 import org.openecomp.sparky.logging.AaiUiMsgs;
45
46 /**
47  * The Class ElasticSearchIndexCleaner.
48  */
49 public class ElasticSearchIndexCleaner implements IndexCleaner {
50
51   private static final Logger LOG =
52       LoggerFactory.getInstance().getLogger(ElasticSearchIndexCleaner.class);
53
54   private static final String BULK_OP_LINE_TEMPLATE = "%s\n";
55   private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
56
57   private ObjectIdCollection before;
58   private ObjectIdCollection after;
59
60   private String host;
61   private String port;
62
63   private String indexName;
64   private String indexType;
65   private int scrollContextTimeToLiveInMinutes;
66   private int numItemsToGetBulkRequest;
67
68   private RestDataProvider restDataProvider;
69   private ObjectMapper mapper;
70
71   /**
72    * Instantiates a new elastic search index cleaner.
73    *
74    * @param restDataProvider the rest data provider
75    * @param indexName the index name
76    * @param indexType the index type
77    * @param host the host
78    * @param port the port
79    * @param scrollContextTimeToLiveInMinutes the scroll context time to live in minutes
80    * @param numItemsToGetBulkRequest the num items to get bulk request
81    */
82   protected ElasticSearchIndexCleaner(RestDataProvider restDataProvider, String indexName,
83       String indexType, String host, String port, int scrollContextTimeToLiveInMinutes,
84       int numItemsToGetBulkRequest) {
85     this.restDataProvider = restDataProvider;
86     this.before = null;
87     this.after = null;
88     this.indexName = indexName;
89     this.indexType = indexType;
90     this.mapper = new ObjectMapper();
91     this.host = host;
92     this.port = port;
93     this.scrollContextTimeToLiveInMinutes = scrollContextTimeToLiveInMinutes;
94     this.numItemsToGetBulkRequest = numItemsToGetBulkRequest;
95   }
96
97   /* (non-Javadoc)
98    * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePreOperationCollection()
99    */
100   @Override
101   public OperationState populatePreOperationCollection() {
102
103     try {
104       before = retrieveAllDocumentIdentifiers();
105       return OperationState.OK;
106     } catch (Exception exc) {
107       LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, indexName, exc.getMessage());
108       return OperationState.ERROR;
109     }
110
111   }
112
113   /* (non-Javadoc)
114    * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePostOperationCollection()
115    */
116   @Override
117   public OperationState populatePostOperationCollection() {
118     try {
119       after = retrieveAllDocumentIdentifiers();
120       return OperationState.OK;
121     } catch (Exception exc) {
122       LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, indexName, exc.getMessage());
123       return OperationState.ERROR;
124     }
125   }
126
127   /* (non-Javadoc)
128    * @see org.openecomp.sparky.synchronizer.IndexCleaner#performCleanup()
129    */
130   @Override
131   public OperationState performCleanup() {
132     // TODO Auto-generated method stub
133     LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP, indexName);
134
135     int sizeBefore = before.getSize();
136     int sizeAfter = after.getSize();
137
138     LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP_SIZE, String.valueOf(sizeBefore),
139         String.valueOf(sizeAfter));
140
141     /*
142      * If the processedImportIds size <= 0, then something has failed in the sync operation and we
143      * shouldn't do the selective delete right now.
144      */
145
146     if (sizeAfter > 0) {
147
148       Collection<String> presyncIds = before.getImportedObjectIds();
149       presyncIds.removeAll(after.getImportedObjectIds());
150
151       try {
152         LOG.info(AaiUiMsgs.ES_SYNC_SELECTIVE_DELETE, indexName, indexType,
153             String.valueOf(presyncIds.size()));
154
155         ObjectIdCollection bulkIds = new ObjectIdCollection();
156
157         Iterator<String> it = presyncIds.iterator();
158         int numItemsInBulkRequest = 0;
159         int numItemsRemainingToBeDeleted = presyncIds.size();
160
161         while (it.hasNext()) {
162
163           bulkIds.addObjectId(it.next());
164           numItemsInBulkRequest++;
165
166           if (numItemsInBulkRequest >= this.numItemsToGetBulkRequest) {
167             LOG.info(AaiUiMsgs.ES_BULK_DELETE, indexName, String.valueOf(bulkIds.getSize()));
168             OperationResult bulkDeleteResult = bulkDelete(bulkIds.getImportedObjectIds());
169             // pegCountersForElasticBulkDelete(bulkDeleteResult);
170             numItemsRemainingToBeDeleted -= numItemsInBulkRequest;
171             numItemsInBulkRequest = 0;
172             bulkIds.clear();
173           }
174         }
175
176         if (numItemsRemainingToBeDeleted > 0) {
177           LOG.info(AaiUiMsgs.ES_BULK_DELETE, indexName, String.valueOf(bulkIds.getSize()));
178           OperationResult bulkDeleteResult = bulkDelete(bulkIds.getImportedObjectIds());
179           // pegCountersForElasticBulkDelete(bulkDeleteResult);
180         }
181
182
183       } catch (Exception exc) {
184         LOG.error(AaiUiMsgs.ES_BULK_DELETE_ERROR, indexName, exc.getLocalizedMessage());
185
186       }
187     }
188
189     return OperationState.OK;
190   }
191
192   @Override
193   public String getIndexName() {
194     return indexName;
195   }
196
197   public void setIndexName(String indexName) {
198     this.indexName = indexName;
199   }
200
201   /**
202    * Builds the initial scroll request payload.
203    *
204    * @param numItemsToGetPerRequest the num items to get per request
205    * @param fieldList the field list
206    * @return the string
207    * @throws JsonProcessingException the json processing exception
208    */
209   protected String buildInitialScrollRequestPayload(int numItemsToGetPerRequest,
210       List<String> fieldList) throws JsonProcessingException {
211
212     ObjectNode rootNode = mapper.createObjectNode();
213     rootNode.put("size", numItemsToGetPerRequest);
214
215     ArrayNode fields = mapper.createArrayNode();
216
217     for (String f : fieldList) {
218       fields.add(f);
219     }
220
221     rootNode.set("fields", fields);
222
223     ObjectNode queryNode = mapper.createObjectNode();
224     queryNode.set("match_all", mapper.createObjectNode());
225
226     rootNode.set("query", queryNode);
227
228     return mapper.writeValueAsString(rootNode);
229
230   }
231
232   /**
233    * Builds the subsequent scroll context request payload.
234    *
235    * @param scrollId the scroll id
236    * @param contextTimeToLiveInMinutes the context time to live in minutes
237    * @return the string
238    * @throws JsonProcessingException the json processing exception
239    */
240   protected String buildSubsequentScrollContextRequestPayload(String scrollId,
241       int contextTimeToLiveInMinutes) throws JsonProcessingException {
242
243     ObjectNode rootNode = mapper.createObjectNode();
244
245     rootNode.put("scroll", contextTimeToLiveInMinutes + "m");
246     rootNode.put("scroll_id", scrollId);
247
248     return mapper.writeValueAsString(rootNode);
249
250   }
251
252   /**
253    * Parses the elastic search result.
254    *
255    * @param jsonResult the json result
256    * @return the json node
257    * @throws JsonProcessingException the json processing exception
258    * @throws IOException Signals that an I/O exception has occurred.
259    */
260   protected JsonNode parseElasticSearchResult(String jsonResult)
261       throws JsonProcessingException, IOException {
262     ObjectMapper mapper = new ObjectMapper();
263     return mapper.readTree(jsonResult);
264   }
265
266   /**
267    * Lookup index doc.
268    *
269    * @param ids the ids
270    * @param docs the docs
271    * @return the array list
272    */
273   protected ArrayList<SearchableEntity> lookupIndexDoc(ArrayList<String> ids,
274       List<SearchableEntity> docs) {
275     ArrayList<SearchableEntity> objs = new ArrayList<SearchableEntity>();
276
277     if (ids != null && docs != null) {
278       for (SearchableEntity d : docs) {
279         if (ids.contains(d.getId())) {
280           objs.add(d);
281         }
282       }
283     }
284
285     return objs;
286   }
287
288   /**
289    * Builds the delete data object.
290    *
291    * @param index the index
292    * @param type the type
293    * @param id the id
294    * @return the object node
295    */
296   protected ObjectNode buildDeleteDataObject(String index, String type, String id) {
297
298     ObjectNode indexDocProperties = mapper.createObjectNode();
299
300     indexDocProperties.put("_index", index);
301     indexDocProperties.put("_type", type);
302     indexDocProperties.put("_id", id);
303
304     ObjectNode rootNode = mapper.createObjectNode();
305     rootNode.set("delete", indexDocProperties);
306
307     return rootNode;
308   }
309
310   /**
311    * This method might appear to be a little strange, and is simply an optimization to take an
312    * elipsed JsonNode key path and retrieve the node at the end of the path, if it exists.
313    *
314    * @param startNode the start node
315    * @param fieldPath the field path
316    * @return the node path
317    */
318   protected JsonNode getNodePath(JsonNode startNode, String... fieldPath) {
319
320     JsonNode jsonNode = null;
321
322     for (String field : fieldPath) {
323       if (jsonNode == null) {
324         jsonNode = startNode.get(field);
325       } else {
326         jsonNode = jsonNode.get(field);
327       }
328
329       /*
330        * This is our safety net in case any intermediate path returns a null
331        */
332
333       if (jsonNode == null) {
334         return null;
335       }
336
337     }
338
339     return jsonNode;
340   }
341
342   /**
343    * Gets the full url.
344    *
345    * @param resourceUrl the resource url
346    * @return the full url
347    */
348   private String getFullUrl(String resourceUrl) {
349     return String.format("http://%s:%s%s", host, port, resourceUrl);
350   }
351
352   /**
353    * Retrieve all document identifiers.
354    *
355    * @return the object id collection
356    * @throws IOException Signals that an I/O exception has occurred.
357    */
358   public ObjectIdCollection retrieveAllDocumentIdentifiers() throws IOException {
359
360     ObjectIdCollection currentDocumentIds = new ObjectIdCollection();
361
362     long opStartTimeInMs = System.currentTimeMillis();
363
364     List<String> fields = new ArrayList<String>();
365     fields.add("_id");
366     // fields.add("entityType");
367
368     String scrollRequestPayload =
369         buildInitialScrollRequestPayload(this.numItemsToGetBulkRequest, fields);
370
371     final String fullUrlStr = getFullUrl("/" + indexName + "/" + indexType + "/_search?scroll="
372         + this.scrollContextTimeToLiveInMinutes + "m");
373
374     OperationResult result =
375         restDataProvider.doPost(fullUrlStr, scrollRequestPayload, "application/json");
376
377     if (result.wasSuccessful()) {
378
379       JsonNode rootNode = parseElasticSearchResult(result.getResult());
380
381       /*
382        * Check the result for success / failure, and enumerate all the index ids that resulted in
383        * success, and ignore the ones that failed or log them so we have a record of the failure.
384        */
385       int totalRecordsAvailable = 0;
386       String scrollId = null;
387       int numRecordsFetched = 0;
388
389       if (rootNode != null) {
390
391         scrollId = getFieldValue(rootNode, "_scroll_id");
392         final String tookStr = getFieldValue(rootNode, "took");
393         int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
394         boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
395
396         if (timedOut) {
397           LOG.error(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "all document Identifiers",
398               String.valueOf(tookInMs));
399         } else {
400           LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "all document Identifiers",
401               String.valueOf(tookInMs));
402         }
403
404         JsonNode hitsNode = rootNode.get("hits");
405         totalRecordsAvailable = Integer.parseInt(hitsNode.get("total").asText());
406
407         LOG.info(AaiUiMsgs.COLLECT_TOTAL, "all document Identifiers",
408             String.valueOf(totalRecordsAvailable));
409
410         /*
411          * Collect all object ids
412          */
413
414         ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
415
416         Iterator<JsonNode> nodeIterator = hitsArray.iterator();
417
418         String key = null;
419         String value = null;
420         JsonNode jsonNode = null;
421
422         while (nodeIterator.hasNext()) {
423
424           jsonNode = nodeIterator.next();
425
426           key = getFieldValue(jsonNode, "_id");
427
428           if (key != null) {
429             currentDocumentIds.addObjectId(key);
430           }
431
432           /*
433            * if (key != null) {
434            * 
435            * JsonNode fieldsNode = jNode.get("fields");
436            * 
437            * if (fieldsNode != null) {
438            * 
439            * JsonNode entityTypeNode = fieldsNode.get("entityType");
440            * 
441            * if (entityTypeNode != null) { ArrayNode aNode = (ArrayNode) entityTypeNode;
442            * 
443            * if (aNode.size() > 0) { value = aNode.get(0).asText(); objAndtTypesMap.put(key, value);
444            * numRecordsFetched++; } } } }
445            */
446
447         }
448
449         int totalRecordsRemainingToFetch = (totalRecordsAvailable - numRecordsFetched);
450
451         int numRequiredAdditionalFetches =
452             (totalRecordsRemainingToFetch / this.numItemsToGetBulkRequest);
453
454         /*
455          * Do an additional fetch for the remaining items (if needed)
456          */
457
458         if (totalRecordsRemainingToFetch % numItemsToGetBulkRequest != 0) {
459           numRequiredAdditionalFetches += 1;
460         }
461
462         if (LOG.isDebugEnabled()) {
463           LOG.debug(AaiUiMsgs.SYNC_NUMBER_REQ_FETCHES,
464               String.valueOf(numRequiredAdditionalFetches));
465         }
466
467
468         for (int x = 0; x < numRequiredAdditionalFetches; x++) {
469
470           if (collectItemsFromScrollContext(scrollId, currentDocumentIds) != OperationState.OK) {
471             // abort the whole thing because now we can't reliably cleanup the orphans.
472             throw new IOException(
473                 "Failed to collect pre-sync doc collection from index.  Aborting operation");
474           }
475           if (LOG.isDebugEnabled()) {
476             LOG.debug(AaiUiMsgs.SYNC_NUMBER_TOTAL_FETCHES,
477                 String.valueOf(currentDocumentIds.getSize()),
478                 String.valueOf(totalRecordsAvailable));
479           }
480
481         }
482
483       }
484
485     } else {
486       // scroll context get failed, nothing else to do
487       LOG.error(AaiUiMsgs.ERROR_GENERIC, result.toString());
488     }
489
490     LOG.info(AaiUiMsgs.COLLECT_TOTAL_TIME, "all document Identifiers",
491         String.valueOf((System.currentTimeMillis() - opStartTimeInMs)));
492
493     return currentDocumentIds;
494
495   }
496
497   /**
498    * Collect items from scroll context.
499    *
500    * @param scrollId the scroll id
501    * @param objectIds the object ids
502    * @return the operation state
503    * @throws IOException Signals that an I/O exception has occurred.
504    */
505   private OperationState collectItemsFromScrollContext(String scrollId,
506       ObjectIdCollection objectIds) throws IOException {
507
508     // ObjectIdCollection documentIdCollection = new ObjectIdCollection();
509
510     String requestPayload =
511         buildSubsequentScrollContextRequestPayload(scrollId, scrollContextTimeToLiveInMinutes);
512
513     final String fullUrlStr = getFullUrl("/_search/scroll");
514
515     OperationResult opResult =
516         restDataProvider.doPost(fullUrlStr, requestPayload, "application/json");
517
518     if (opResult.getResultCode() >= 300) {
519       LOG.warn(AaiUiMsgs.ES_SCROLL_CONTEXT_ERROR, opResult.getResult());
520       return OperationState.ERROR;
521     }
522
523     JsonNode rootNode = parseElasticSearchResult(opResult.getResult());
524     boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
525     final String tookStr = getFieldValue(rootNode, "took");
526     int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
527
528     JsonNode hitsNode = rootNode.get("hits");
529
530     /*
531      * Check the result for success / failure, and enumerate all the index ids that resulted in
532      * success, and ignore the ones that failed or log them so we have a record of the failure.
533      */
534
535     if (rootNode != null) {
536
537       if (timedOut) {
538         LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "Scroll Context", String.valueOf(tookInMs));
539       } else {
540         LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "Scroll Context", String.valueOf(tookInMs));
541       }
542
543       /*
544        * Collect all object ids
545        */
546
547       ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
548       String key = null;
549       String value = null;
550       JsonNode jsonNode = null;
551
552       Iterator<JsonNode> nodeIterator = hitsArray.iterator();
553
554       while (nodeIterator.hasNext()) {
555
556         jsonNode = nodeIterator.next();
557
558         key = getFieldValue(jsonNode, "_id");
559
560         if (key != null) {
561           objectIds.addObjectId(key);
562
563           /*
564            * JsonNode fieldsNode = jNode.get("fields");
565            * 
566            * if (fieldsNode != null) {
567            * 
568            * JsonNode entityTypeNode = fieldsNode.get("entityType");
569            * 
570            * if (entityTypeNode != null) { ArrayNode aNode = (ArrayNode) entityTypeNode;
571            * 
572            * if (aNode.size() > 0) { value = aNode.get(0).asText(); objectIdsAndTypes.put(key,
573            * value); } } } }
574            */
575
576         }
577
578       }
579     }
580
581     return OperationState.OK;
582   }
583
584   /**
585    * Gets the field value.
586    *
587    * @param node the node
588    * @param fieldName the field name
589    * @return the field value
590    */
591   protected String getFieldValue(JsonNode node, String fieldName) {
592
593     JsonNode field = node.get(fieldName);
594
595     if (field != null) {
596       return field.asText();
597     }
598
599     return null;
600
601   }
602
603   /**
604    * Bulk delete.
605    *
606    * @param docIds the doc ids
607    * @return the operation result
608    * @throws IOException Signals that an I/O exception has occurred.
609    */
610   public OperationResult bulkDelete(Collection<String> docIds) throws IOException {
611
612     if (docIds == null || docIds.size() == 0) {
613       LOG.info(AaiUiMsgs.ES_BULK_DELETE_SKIP);
614       return new OperationResult(500,
615           "Skipping bulkDelete(); operation because docs to delete list is empty");
616     }
617
618     LOG.info(AaiUiMsgs.ES_BULK_DELETE_START, String.valueOf(docIds.size()));
619
620     StringBuilder sb = new StringBuilder(128);
621
622     for (String id : docIds) {
623       sb.append(
624           String.format(BULK_OP_LINE_TEMPLATE, buildDeleteDataObject(indexName, indexType, id)));
625     }
626
627     sb.append("\n");
628
629     final String fullUrlStr = getFullUrl("/_bulk");
630
631     return restDataProvider.doPost(fullUrlStr, sb.toString(), "application/x-www-form-urlencoded");
632
633   }
634
635   /*
636   
637   */
638
639 }