09056c0f79b0a68f5df5f864e081bca7aad63dc1
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / sync / ElasticSearchIndexCleaner.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.sync;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Iterator;
29 import java.util.List;
30
31 import javax.ws.rs.core.MediaType;
32
33 import org.onap.aai.cl.api.Logger;
34 import org.onap.aai.cl.eelf.LoggerFactory;
35 import org.onap.aai.restclient.client.OperationResult;
36 import org.onap.aai.sparky.dal.ElasticSearchAdapter;
37 import org.onap.aai.sparky.logging.AaiUiMsgs;
38 import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig;
39 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
40 import org.onap.aai.sparky.sync.entity.ObjectIdCollection;
41 import org.onap.aai.sparky.sync.entity.SearchableEntity;
42 import org.onap.aai.sparky.sync.enumeration.OperationState;
43
44 import com.fasterxml.jackson.core.JsonProcessingException;
45 import com.fasterxml.jackson.databind.JsonNode;
46 import com.fasterxml.jackson.databind.ObjectMapper;
47 import com.fasterxml.jackson.databind.node.ArrayNode;
48 import com.fasterxml.jackson.databind.node.ObjectNode;
49
50 /**
51  * The Class ElasticSearchIndexCleaner.
52  */
53 public class ElasticSearchIndexCleaner implements IndexCleaner {
54
55   private static final Logger LOG =
56       LoggerFactory.getInstance().getLogger(ElasticSearchIndexCleaner.class);
57
58   private static final String BULK_OP_LINE_TEMPLATE = "%s\n";
59   private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
60
61   private ObjectIdCollection before;
62   private ObjectIdCollection after;
63
64   private ObjectMapper mapper;
65   private ElasticSearchAdapter esAdapter;
66   private ElasticSearchEndpointConfig endpointConfig;
67   private ElasticSearchSchemaConfig schemaConfig;
68
69   /**
70    * Instantiates a new elastic search index cleaner.
71    *
72    * @param restDataProvider the rest data provider
73    * @param indexName the index name
74    * @param indexType the index type
75    * @param host the host
76    * @param port the port
77    * @param scrollContextTimeToLiveInMinutes the scroll context time to live in minutes
78    * @param numItemsToGetBulkRequest the num items to get bulk request
79    */
80   public ElasticSearchIndexCleaner(ElasticSearchAdapter esAdapter, ElasticSearchEndpointConfig endpointConfig, ElasticSearchSchemaConfig schemaConfig) {
81     this.esAdapter = esAdapter;
82     this.before = null;
83     this.after = null;
84     this.endpointConfig = endpointConfig;
85     this.schemaConfig = schemaConfig;
86     this.mapper = new ObjectMapper();
87   }
88
89   /*
90    * (non-Javadoc)
91    * 
92    * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePreOperationCollection()
93    */
94   @Override
95   public OperationState populatePreOperationCollection() {
96
97     try {
98       before = retrieveAllDocumentIdentifiers();
99       return OperationState.OK;
100     } catch (Exception exc) {
101       LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage());
102       return OperationState.ERROR;
103     }
104
105   }
106
107   /*
108    * (non-Javadoc)
109    * 
110    * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePostOperationCollection()
111    */
112   @Override
113   public OperationState populatePostOperationCollection() {
114     try {
115       after = retrieveAllDocumentIdentifiers();
116       return OperationState.OK;
117     } catch (Exception exc) {
118       LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage());
119       return OperationState.ERROR;
120     }
121   }
122
123   /*
124    * (non-Javadoc)
125    * 
126    * @see org.openecomp.sparky.synchronizer.IndexCleaner#performCleanup()
127    */
128   @Override
129   public OperationState performCleanup() {
130     // TODO Auto-generated method stub
131     LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP, schemaConfig.getIndexName());
132
133     int sizeBefore = before.getSize();
134     int sizeAfter = after.getSize();
135
136     LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP_SIZE, String.valueOf(sizeBefore),
137         String.valueOf(sizeAfter));
138
139     /*
140      * If the processedImportIds size <= 0, then something has failed in the sync operation and we
141      * shouldn't do the selective delete right now.
142      */
143
144     if (sizeAfter > 0) {
145
146       Collection<String> presyncIds = before.getImportedObjectIds();
147       presyncIds.removeAll(after.getImportedObjectIds());
148
149       try {
150         LOG.info(AaiUiMsgs.ES_SYNC_SELECTIVE_DELETE, schemaConfig.getIndexName(), schemaConfig.getIndexDocType(),
151             String.valueOf(presyncIds.size()));
152
153         ObjectIdCollection bulkIds = new ObjectIdCollection();
154
155         Iterator<String> it = presyncIds.iterator();
156         int numItemsInBulkRequest = 0;
157         int numItemsRemainingToBeDeleted = presyncIds.size();
158
159         while (it.hasNext()) {
160
161           bulkIds.addObjectId(it.next());
162           numItemsInBulkRequest++;
163
164           if (numItemsInBulkRequest >= endpointConfig.getScrollContextBatchRequestSize()) {
165             LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(), String.valueOf(bulkIds.getSize()));
166             bulkDelete(bulkIds.getImportedObjectIds());
167             numItemsRemainingToBeDeleted -= numItemsInBulkRequest;
168             numItemsInBulkRequest = 0;
169             bulkIds.clear();
170           }
171         }
172
173         if (numItemsRemainingToBeDeleted > 0) {
174           LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(), String.valueOf(bulkIds.getSize()));
175           bulkDelete(bulkIds.getImportedObjectIds());
176         }
177
178
179       } catch (Exception exc) {
180         LOG.error(AaiUiMsgs.ES_BULK_DELETE_ERROR, schemaConfig.getIndexName(), exc.getLocalizedMessage());
181
182       }
183     }
184
185     return OperationState.OK;
186   }
187
188   @Override
189   public String getIndexName() {
190     return schemaConfig.getIndexName();
191   }
192
193   /**
194    * Builds the initial scroll request payload.
195    *
196    * @param numItemsToGetPerRequest the num items to get per request
197    * @param fieldList the field list
198    * @return the string
199    * @throws JsonProcessingException the json processing exception
200    */
201   protected String buildInitialScrollRequestPayload(int numItemsToGetPerRequest,
202       List<String> fieldList) throws JsonProcessingException {
203
204     ObjectNode rootNode = mapper.createObjectNode();
205     rootNode.put("size", numItemsToGetPerRequest);
206
207     ArrayNode fields = mapper.createArrayNode();
208
209     for (String f : fieldList) {
210       fields.add(f);
211     }
212
213     rootNode.set("fields", fields);
214
215     ObjectNode queryNode = mapper.createObjectNode();
216     queryNode.set("match_all", mapper.createObjectNode());
217
218     rootNode.set("query", queryNode);
219
220     return mapper.writeValueAsString(rootNode);
221
222   }
223
224   /**
225    * Builds the subsequent scroll context request payload.
226    *
227    * @param scrollId the scroll id
228    * @param contextTimeToLiveInMinutes the context time to live in minutes
229    * @return the string
230    * @throws JsonProcessingException the json processing exception
231    */
232   protected String buildSubsequentScrollContextRequestPayload(String scrollId,
233       int contextTimeToLiveInMinutes) throws JsonProcessingException {
234
235     ObjectNode rootNode = mapper.createObjectNode();
236
237     rootNode.put("scroll", contextTimeToLiveInMinutes + "m");
238     rootNode.put("scroll_id", scrollId);
239
240     return mapper.writeValueAsString(rootNode);
241
242   }
243
244   /**
245    * Parses the elastic search result.
246    *
247    * @param jsonResult the json result
248    * @return the json node
249    * @throws JsonProcessingException the json processing exception
250    * @throws IOException Signals that an I/O exception has occurred.
251    */
252   protected JsonNode parseElasticSearchResult(String jsonResult)
253       throws JsonProcessingException, IOException {
254     ObjectMapper mapper = new ObjectMapper();
255     return mapper.readTree(jsonResult);
256   }
257
258   /**
259    * Lookup index doc.
260    *
261    * @param ids the ids
262    * @param docs the docs
263    * @return the array list
264    */
265   protected ArrayList<SearchableEntity> lookupIndexDoc(ArrayList<String> ids,
266       List<SearchableEntity> docs) {
267     ArrayList<SearchableEntity> objs = new ArrayList<SearchableEntity>();
268
269     if (ids != null && docs != null) {
270       for (SearchableEntity d : docs) {
271         if (ids.contains(d.getId())) {
272           objs.add(d);
273         }
274       }
275     }
276
277     return objs;
278   }
279
280   /**
281    * Builds the delete data object.
282    *
283    * @param index the index
284    * @param type the type
285    * @param id the id
286    * @return the object node
287    */
288   protected ObjectNode buildDeleteDataObject(String index, String type, String id) {
289
290     ObjectNode indexDocProperties = mapper.createObjectNode();
291
292     indexDocProperties.put("_index", index);
293     indexDocProperties.put("_type", type);
294     indexDocProperties.put("_id", id);
295
296     ObjectNode rootNode = mapper.createObjectNode();
297     rootNode.set("delete", indexDocProperties);
298
299     return rootNode;
300   }
301
302   /**
303    * This method might appear to be a little strange, and is simply an optimization to take an
304    * elipsed JsonNode key path and retrieve the node at the end of the path, if it exists.
305    *
306    * @param startNode the start node
307    * @param fieldPath the field path
308    * @return the node path
309    */
310   protected JsonNode getNodePath(JsonNode startNode, String... fieldPath) {
311
312     JsonNode jsonNode = null;
313
314     for (String field : fieldPath) {
315       if (jsonNode == null) {
316         jsonNode = startNode.get(field);
317       } else {
318         jsonNode = jsonNode.get(field);
319       }
320
321       /*
322        * This is our safety net in case any intermediate path returns a null
323        */
324
325       if (jsonNode == null) {
326         return null;
327       }
328
329     }
330
331     return jsonNode;
332   }
333
334   /**
335    * Gets the full url.
336    *
337    * @param resourceUrl the resource url
338    * @return the full url
339    */
340   private String getFullUrl(String resourceUrl) {
341     return String.format("http://%s:%s%s", endpointConfig.getEsIpAddress(),
342         endpointConfig.getEsServerPort(), resourceUrl);
343   }
344
345   /**
346    * Retrieve all document identifiers.
347    *
348    * @return the object id collection
349    * @throws IOException Signals that an I/O exception has occurred.
350    */
351   public ObjectIdCollection retrieveAllDocumentIdentifiers() throws IOException {
352
353     ObjectIdCollection currentDocumentIds = new ObjectIdCollection();
354
355     long opStartTimeInMs = System.currentTimeMillis();
356
357     List<String> fields = new ArrayList<String>();
358     fields.add("_id");
359     // fields.add("entityType");
360
361     String scrollRequestPayload =
362         buildInitialScrollRequestPayload(endpointConfig.getScrollContextBatchRequestSize(), fields);
363
364     final String fullUrlStr = getFullUrl("/" + schemaConfig.getIndexName()+ "/" + schemaConfig.getIndexDocType() + "/_search?scroll="
365         + endpointConfig.getScrollContextTimeToLiveInMinutes() + "m");
366
367     OperationResult result =
368         esAdapter.doPost(fullUrlStr, scrollRequestPayload, MediaType.APPLICATION_JSON_TYPE);
369
370     if (result.wasSuccessful()) {
371
372       JsonNode rootNode = parseElasticSearchResult(result.getResult());
373
374       /*
375        * Check the result for success / failure, and enumerate all the index ids that resulted in
376        * success, and ignore the ones that failed or log them so we have a record of the failure.
377        */
378       int totalRecordsAvailable = 0;
379       String scrollId = null;
380       int numRecordsFetched = 0;
381
382       if (rootNode != null) {
383
384         scrollId = getFieldValue(rootNode, "_scroll_id");
385         final String tookStr = getFieldValue(rootNode, "took");
386         int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
387         boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
388
389         if (timedOut) {
390           LOG.error(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "all document Identifiers",
391               String.valueOf(tookInMs));
392         } else {
393           LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "all document Identifiers",
394               String.valueOf(tookInMs));
395         }
396
397         JsonNode hitsNode = rootNode.get("hits");
398         totalRecordsAvailable = Integer.parseInt(hitsNode.get("total").asText());
399
400         LOG.info(AaiUiMsgs.COLLECT_TOTAL, "all document Identifiers",
401             String.valueOf(totalRecordsAvailable));
402
403         /*
404          * Collect all object ids
405          */
406
407         ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
408
409         Iterator<JsonNode> nodeIterator = hitsArray.iterator();
410
411         String key = null;
412         String value = null;
413         JsonNode jsonNode = null;
414
415         while (nodeIterator.hasNext()) {
416
417           jsonNode = nodeIterator.next();
418
419           key = getFieldValue(jsonNode, "_id");
420
421           if (key != null) {
422             currentDocumentIds.addObjectId(key);
423           }
424
425         }
426
427         int totalRecordsRemainingToFetch = (totalRecordsAvailable - numRecordsFetched);
428
429         int numRequiredAdditionalFetches =
430             (totalRecordsRemainingToFetch / endpointConfig.getScrollContextBatchRequestSize());
431
432         /*
433          * Do an additional fetch for the remaining items (if needed)
434          */
435
436         if (totalRecordsRemainingToFetch % endpointConfig.getScrollContextBatchRequestSize() != 0) {
437           numRequiredAdditionalFetches += 1;
438         }
439
440         if (LOG.isDebugEnabled()) {
441           LOG.debug(AaiUiMsgs.SYNC_NUMBER_REQ_FETCHES,
442               String.valueOf(numRequiredAdditionalFetches));
443         }
444
445
446         for (int x = 0; x < numRequiredAdditionalFetches; x++) {
447
448           if (collectItemsFromScrollContext(scrollId, currentDocumentIds) != OperationState.OK) {
449             // abort the whole thing because now we can't reliably cleanup the orphans.
450             throw new IOException(
451                 "Failed to collect pre-sync doc collection from index.  Aborting operation");
452           }
453           if (LOG.isDebugEnabled()) {
454             LOG.debug(AaiUiMsgs.SYNC_NUMBER_TOTAL_FETCHES,
455                 String.valueOf(currentDocumentIds.getSize()),
456                 String.valueOf(totalRecordsAvailable));
457           }
458
459         }
460
461       }
462
463     } else {
464       // scroll context get failed, nothing else to do
465       LOG.error(AaiUiMsgs.ERROR_GENERIC, result.toString());
466     }
467
468     LOG.info(AaiUiMsgs.COLLECT_TOTAL_TIME, "all document Identifiers",
469         String.valueOf((System.currentTimeMillis() - opStartTimeInMs)));
470
471     return currentDocumentIds;
472
473   }
474
475   /**
476    * Collect items from scroll context.
477    *
478    * @param scrollId the scroll id
479    * @param objectIds the object ids
480    * @return the operation state
481    * @throws IOException Signals that an I/O exception has occurred.
482    */
483   private OperationState collectItemsFromScrollContext(String scrollId,
484       ObjectIdCollection objectIds) throws IOException {
485
486     String requestPayload = buildSubsequentScrollContextRequestPayload(scrollId,
487         endpointConfig.getScrollContextTimeToLiveInMinutes());
488
489     final String fullUrlStr = getFullUrl("/_search/scroll");
490
491     OperationResult opResult =
492         esAdapter.doPost(fullUrlStr, requestPayload, MediaType.APPLICATION_JSON_TYPE);
493
494     if (opResult.getResultCode() >= 300) {
495       LOG.warn(AaiUiMsgs.ES_SCROLL_CONTEXT_ERROR, opResult.getResult());
496       return OperationState.ERROR;
497     }
498
499     JsonNode rootNode = parseElasticSearchResult(opResult.getResult());
500     boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
501     final String tookStr = getFieldValue(rootNode, "took");
502     int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
503
504     JsonNode hitsNode = rootNode.get("hits");
505
506     /*
507      * Check the result for success / failure, and enumerate all the index ids that resulted in
508      * success, and ignore the ones that failed or log them so we have a record of the failure.
509      */
510
511     if (rootNode != null) {
512
513       if (timedOut) {
514         LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "Scroll Context", String.valueOf(tookInMs));
515       } else {
516         LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "Scroll Context", String.valueOf(tookInMs));
517       }
518
519       /*
520        * Collect all object ids
521        */
522
523       ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
524       String key = null;
525       String value = null;
526       JsonNode jsonNode = null;
527
528       Iterator<JsonNode> nodeIterator = hitsArray.iterator();
529
530       while (nodeIterator.hasNext()) {
531
532         jsonNode = nodeIterator.next();
533
534         key = getFieldValue(jsonNode, "_id");
535
536         if (key != null) {
537           objectIds.addObjectId(key);
538
539         }
540
541       }
542     }
543
544     return OperationState.OK;
545   }
546
547   /**
548    * Gets the field value.
549    *
550    * @param node the node
551    * @param fieldName the field name
552    * @return the field value
553    */
554   protected String getFieldValue(JsonNode node, String fieldName) {
555
556     JsonNode field = node.get(fieldName);
557
558     if (field != null) {
559       return field.asText();
560     }
561
562     return null;
563
564   }
565
566   /**
567    * Bulk delete.
568    *
569    * @param docIds the doc ids
570    * @return the operation result
571    * @throws IOException Signals that an I/O exception has occurred.
572    */
573   public OperationResult bulkDelete(Collection<String> docIds) throws IOException {
574
575     if (docIds == null || docIds.size() == 0) {
576       LOG.info(AaiUiMsgs.ES_BULK_DELETE_SKIP);
577       return new OperationResult(500,
578           "Skipping bulkDelete(); operation because docs to delete list is empty");
579     }
580
581     LOG.info(AaiUiMsgs.ES_BULK_DELETE_START, String.valueOf(docIds.size()));
582
583     StringBuilder sb = new StringBuilder(128);
584
585     for (String id : docIds) {
586       sb.append(String.format(BULK_OP_LINE_TEMPLATE,
587           buildDeleteDataObject(schemaConfig.getIndexName(), schemaConfig.getIndexDocType(), id)));
588     }
589
590     sb.append("\n");
591
592     final String fullUrlStr = getFullUrl("/_bulk");
593
594     return esAdapter.doPost(fullUrlStr, sb.toString(), MediaType.APPLICATION_FORM_URLENCODED_TYPE);
595
596   }
597
598   /*
599   
600   */
601
602 }