Adding interfaces in documentation
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / sync / ElasticSearchIndexCleaner.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sparky.sync;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Iterator;
27 import java.util.List;
28
29 import javax.ws.rs.core.MediaType;
30
31 import org.onap.aai.cl.api.Logger;
32 import org.onap.aai.cl.eelf.LoggerFactory;
33 import org.onap.aai.restclient.client.OperationResult;
34 import org.onap.aai.sparky.dal.ElasticSearchAdapter;
35 import org.onap.aai.sparky.logging.AaiUiMsgs;
36 import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig;
37 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
38 import org.onap.aai.sparky.sync.entity.ObjectIdCollection;
39 import org.onap.aai.sparky.sync.entity.SearchableEntity;
40 import org.onap.aai.sparky.sync.enumeration.OperationState;
41
42 import com.fasterxml.jackson.core.JsonProcessingException;
43 import com.fasterxml.jackson.databind.JsonNode;
44 import com.fasterxml.jackson.databind.ObjectMapper;
45 import com.fasterxml.jackson.databind.node.ArrayNode;
46 import com.fasterxml.jackson.databind.node.ObjectNode;
47
48 /**
49  * The Class ElasticSearchIndexCleaner.
50  */
51 public class ElasticSearchIndexCleaner implements IndexCleaner {
52
53   private static final Logger LOG =
54       LoggerFactory.getInstance().getLogger(ElasticSearchIndexCleaner.class);
55
56   private static final String BULK_OP_LINE_TEMPLATE = "%s\n";
57   private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
58
59   private ObjectIdCollection before;
60   private ObjectIdCollection after;
61
62   private ObjectMapper mapper;
63   private ElasticSearchAdapter esAdapter;
64   private ElasticSearchEndpointConfig endpointConfig;
65   private ElasticSearchSchemaConfig schemaConfig;
66
67   /**
68    * Instantiates a new elastic search index cleaner.
69    *
70    * @param restDataProvider the rest data provider
71    * @param indexName the index name
72    * @param indexType the index type
73    * @param host the host
74    * @param port the port
75    * @param scrollContextTimeToLiveInMinutes the scroll context time to live in minutes
76    * @param numItemsToGetBulkRequest the num items to get bulk request
77    */
78   public ElasticSearchIndexCleaner(ElasticSearchAdapter esAdapter, ElasticSearchEndpointConfig endpointConfig, ElasticSearchSchemaConfig schemaConfig) {
79     this.esAdapter = esAdapter;
80     this.before = null;
81     this.after = null;
82     this.endpointConfig = endpointConfig;
83     this.schemaConfig = schemaConfig;
84     this.mapper = new ObjectMapper();
85   }
86
87   /*
88    * (non-Javadoc)
89    * 
90    * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePreOperationCollection()
91    */
92   @Override
93   public OperationState populatePreOperationCollection() {
94
95     try {
96       before = retrieveAllDocumentIdentifiers();
97       return OperationState.OK;
98     } catch (Exception exc) {
99       LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage());
100       return OperationState.ERROR;
101     }
102
103   }
104
105   /*
106    * (non-Javadoc)
107    * 
108    * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePostOperationCollection()
109    */
110   @Override
111   public OperationState populatePostOperationCollection() {
112     try {
113       after = retrieveAllDocumentIdentifiers();
114       return OperationState.OK;
115     } catch (Exception exc) {
116       LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage());
117       return OperationState.ERROR;
118     }
119   }
120
121   /*
122    * (non-Javadoc)
123    * 
124    * @see org.openecomp.sparky.synchronizer.IndexCleaner#performCleanup()
125    */
126   @Override
127   public OperationState performCleanup() {
128     // TODO Auto-generated method stub
129     LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP, schemaConfig.getIndexName());
130
131     int sizeBefore = before.getSize();
132     int sizeAfter = after.getSize();
133
134     LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP_SIZE, String.valueOf(sizeBefore),
135         String.valueOf(sizeAfter));
136
137     /*
138      * If the processedImportIds size <= 0, then something has failed in the sync operation and we
139      * shouldn't do the selective delete right now.
140      */
141
142     if (sizeAfter > 0) {
143
144       Collection<String> presyncIds = before.getImportedObjectIds();
145       presyncIds.removeAll(after.getImportedObjectIds());
146
147       try {
148         LOG.info(AaiUiMsgs.ES_SYNC_SELECTIVE_DELETE, schemaConfig.getIndexName(), schemaConfig.getIndexDocType(),
149             String.valueOf(presyncIds.size()));
150
151         ObjectIdCollection bulkIds = new ObjectIdCollection();
152
153         Iterator<String> it = presyncIds.iterator();
154         int numItemsInBulkRequest = 0;
155         int numItemsRemainingToBeDeleted = presyncIds.size();
156
157         while (it.hasNext()) {
158
159           bulkIds.addObjectId(it.next());
160           numItemsInBulkRequest++;
161
162           if (numItemsInBulkRequest >= endpointConfig.getScrollContextBatchRequestSize()) {
163             LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(), String.valueOf(bulkIds.getSize()));
164             bulkDelete(bulkIds.getImportedObjectIds());
165             numItemsRemainingToBeDeleted -= numItemsInBulkRequest;
166             numItemsInBulkRequest = 0;
167             bulkIds.clear();
168           }
169         }
170
171         if (numItemsRemainingToBeDeleted > 0) {
172           LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(), String.valueOf(bulkIds.getSize()));
173           bulkDelete(bulkIds.getImportedObjectIds());
174         }
175
176
177       } catch (Exception exc) {
178         LOG.error(AaiUiMsgs.ES_BULK_DELETE_ERROR, schemaConfig.getIndexName(), exc.getLocalizedMessage());
179
180       }
181     }
182
183     return OperationState.OK;
184   }
185
186   @Override
187   public String getIndexName() {
188     return schemaConfig.getIndexName();
189   }
190
191   /**
192    * Builds the initial scroll request payload.
193    *
194    * @param numItemsToGetPerRequest the num items to get per request
195    * @param fieldList the field list
196    * @return the string
197    * @throws JsonProcessingException the json processing exception
198    */
199   protected String buildInitialScrollRequestPayload(int numItemsToGetPerRequest,
200       List<String> fieldList) throws JsonProcessingException {
201
202     ObjectNode rootNode = mapper.createObjectNode();
203     rootNode.put("size", numItemsToGetPerRequest);
204
205     ArrayNode fields = mapper.createArrayNode();
206
207     for (String f : fieldList) {
208       fields.add(f);
209     }
210
211     rootNode.set("fields", fields);
212
213     ObjectNode queryNode = mapper.createObjectNode();
214     queryNode.set("match_all", mapper.createObjectNode());
215
216     rootNode.set("query", queryNode);
217
218     return mapper.writeValueAsString(rootNode);
219
220   }
221
222   /**
223    * Builds the subsequent scroll context request payload.
224    *
225    * @param scrollId the scroll id
226    * @param contextTimeToLiveInMinutes the context time to live in minutes
227    * @return the string
228    * @throws JsonProcessingException the json processing exception
229    */
230   protected String buildSubsequentScrollContextRequestPayload(String scrollId,
231       int contextTimeToLiveInMinutes) throws JsonProcessingException {
232
233     ObjectNode rootNode = mapper.createObjectNode();
234
235     rootNode.put("scroll", contextTimeToLiveInMinutes + "m");
236     rootNode.put("scroll_id", scrollId);
237
238     return mapper.writeValueAsString(rootNode);
239
240   }
241
242   /**
243    * Parses the elastic search result.
244    *
245    * @param jsonResult the json result
246    * @return the json node
247    * @throws JsonProcessingException the json processing exception
248    * @throws IOException Signals that an I/O exception has occurred.
249    */
250   protected JsonNode parseElasticSearchResult(String jsonResult)
251       throws JsonProcessingException, IOException {
252     ObjectMapper mapper = new ObjectMapper();
253     return mapper.readTree(jsonResult);
254   }
255
256   /**
257    * Lookup index doc.
258    *
259    * @param ids the ids
260    * @param docs the docs
261    * @return the array list
262    */
263   protected ArrayList<SearchableEntity> lookupIndexDoc(ArrayList<String> ids,
264       List<SearchableEntity> docs) {
265     ArrayList<SearchableEntity> objs = new ArrayList<SearchableEntity>();
266
267     if (ids != null && docs != null) {
268       for (SearchableEntity d : docs) {
269         if (ids.contains(d.getId())) {
270           objs.add(d);
271         }
272       }
273     }
274
275     return objs;
276   }
277
278   /**
279    * Builds the delete data object.
280    *
281    * @param index the index
282    * @param type the type
283    * @param id the id
284    * @return the object node
285    */
286   protected ObjectNode buildDeleteDataObject(String index, String type, String id) {
287
288     ObjectNode indexDocProperties = mapper.createObjectNode();
289
290     indexDocProperties.put("_index", index);
291     indexDocProperties.put("_type", type);
292     indexDocProperties.put("_id", id);
293
294     ObjectNode rootNode = mapper.createObjectNode();
295     rootNode.set("delete", indexDocProperties);
296
297     return rootNode;
298   }
299
300   /**
301    * This method might appear to be a little strange, and is simply an optimization to take an
302    * elipsed JsonNode key path and retrieve the node at the end of the path, if it exists.
303    *
304    * @param startNode the start node
305    * @param fieldPath the field path
306    * @return the node path
307    */
308   protected JsonNode getNodePath(JsonNode startNode, String... fieldPath) {
309
310     JsonNode jsonNode = null;
311
312     for (String field : fieldPath) {
313       if (jsonNode == null) {
314         jsonNode = startNode.get(field);
315       } else {
316         jsonNode = jsonNode.get(field);
317       }
318
319       /*
320        * This is our safety net in case any intermediate path returns a null
321        */
322
323       if (jsonNode == null) {
324         return null;
325       }
326
327     }
328
329     return jsonNode;
330   }
331
332   /**
333    * Gets the full url.
334    *
335    * @param resourceUrl the resource url
336    * @return the full url
337    */
338   private String getFullUrl(String resourceUrl) {
339     return String.format("http://%s:%s%s", endpointConfig.getEsIpAddress(),
340         endpointConfig.getEsServerPort(), resourceUrl);
341   }
342
343   /**
344    * Retrieve all document identifiers.
345    *
346    * @return the object id collection
347    * @throws IOException Signals that an I/O exception has occurred.
348    */
349   public ObjectIdCollection retrieveAllDocumentIdentifiers() throws IOException {
350
351     ObjectIdCollection currentDocumentIds = new ObjectIdCollection();
352
353     long opStartTimeInMs = System.currentTimeMillis();
354
355     List<String> fields = new ArrayList<String>();
356     fields.add("_id");
357     // fields.add("entityType");
358
359     String scrollRequestPayload =
360         buildInitialScrollRequestPayload(endpointConfig.getScrollContextBatchRequestSize(), fields);
361
362     final String fullUrlStr = getFullUrl("/" + schemaConfig.getIndexName()+ "/" + schemaConfig.getIndexDocType() + "/_search?scroll="
363         + endpointConfig.getScrollContextTimeToLiveInMinutes() + "m");
364
365     OperationResult result =
366         esAdapter.doPost(fullUrlStr, scrollRequestPayload, MediaType.APPLICATION_JSON_TYPE);
367
368     if (result.wasSuccessful()) {
369
370       JsonNode rootNode = parseElasticSearchResult(result.getResult());
371
372       /*
373        * Check the result for success / failure, and enumerate all the index ids that resulted in
374        * success, and ignore the ones that failed or log them so we have a record of the failure.
375        */
376       int totalRecordsAvailable = 0;
377       String scrollId = null;
378       int numRecordsFetched = 0;
379
380       if (rootNode != null) {
381
382         scrollId = getFieldValue(rootNode, "_scroll_id");
383         final String tookStr = getFieldValue(rootNode, "took");
384         int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
385         boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
386
387         if (timedOut) {
388           LOG.error(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "all document Identifiers",
389               String.valueOf(tookInMs));
390         } else {
391           LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "all document Identifiers",
392               String.valueOf(tookInMs));
393         }
394
395         JsonNode hitsNode = rootNode.get("hits");
396         totalRecordsAvailable = Integer.parseInt(hitsNode.get("total").asText());
397
398         LOG.info(AaiUiMsgs.COLLECT_TOTAL, "all document Identifiers",
399             String.valueOf(totalRecordsAvailable));
400
401         /*
402          * Collect all object ids
403          */
404
405         ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
406
407         Iterator<JsonNode> nodeIterator = hitsArray.iterator();
408
409         String key = null;
410         String value = null;
411         JsonNode jsonNode = null;
412
413         while (nodeIterator.hasNext()) {
414
415           jsonNode = nodeIterator.next();
416
417           key = getFieldValue(jsonNode, "_id");
418
419           if (key != null) {
420             currentDocumentIds.addObjectId(key);
421           }
422
423         }
424
425         int totalRecordsRemainingToFetch = (totalRecordsAvailable - numRecordsFetched);
426
427         int numRequiredAdditionalFetches =
428             (totalRecordsRemainingToFetch / endpointConfig.getScrollContextBatchRequestSize());
429
430         /*
431          * Do an additional fetch for the remaining items (if needed)
432          */
433
434         if (totalRecordsRemainingToFetch % endpointConfig.getScrollContextBatchRequestSize() != 0) {
435           numRequiredAdditionalFetches += 1;
436         }
437
438         if (LOG.isDebugEnabled()) {
439           LOG.debug(AaiUiMsgs.SYNC_NUMBER_REQ_FETCHES,
440               String.valueOf(numRequiredAdditionalFetches));
441         }
442
443
444         for (int x = 0; x < numRequiredAdditionalFetches; x++) {
445
446           if (collectItemsFromScrollContext(scrollId, currentDocumentIds) != OperationState.OK) {
447             // abort the whole thing because now we can't reliably cleanup the orphans.
448             throw new IOException(
449                 "Failed to collect pre-sync doc collection from index.  Aborting operation");
450           }
451           if (LOG.isDebugEnabled()) {
452             LOG.debug(AaiUiMsgs.SYNC_NUMBER_TOTAL_FETCHES,
453                 String.valueOf(currentDocumentIds.getSize()),
454                 String.valueOf(totalRecordsAvailable));
455           }
456
457         }
458
459       }
460
461     } else {
462       // scroll context get failed, nothing else to do
463       LOG.error(AaiUiMsgs.ERROR_GENERIC, result.toString());
464     }
465
466     LOG.info(AaiUiMsgs.COLLECT_TOTAL_TIME, "all document Identifiers",
467         String.valueOf((System.currentTimeMillis() - opStartTimeInMs)));
468
469     return currentDocumentIds;
470
471   }
472
473   /**
474    * Collect items from scroll context.
475    *
476    * @param scrollId the scroll id
477    * @param objectIds the object ids
478    * @return the operation state
479    * @throws IOException Signals that an I/O exception has occurred.
480    */
481   private OperationState collectItemsFromScrollContext(String scrollId,
482       ObjectIdCollection objectIds) throws IOException {
483
484     String requestPayload = buildSubsequentScrollContextRequestPayload(scrollId,
485         endpointConfig.getScrollContextTimeToLiveInMinutes());
486
487     final String fullUrlStr = getFullUrl("/_search/scroll");
488
489     OperationResult opResult =
490         esAdapter.doPost(fullUrlStr, requestPayload, MediaType.APPLICATION_JSON_TYPE);
491
492     if (opResult.getResultCode() >= 300) {
493       LOG.warn(AaiUiMsgs.ES_SCROLL_CONTEXT_ERROR, opResult.getResult());
494       return OperationState.ERROR;
495     }
496
497     JsonNode rootNode = parseElasticSearchResult(opResult.getResult());
498     boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
499     final String tookStr = getFieldValue(rootNode, "took");
500     int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
501
502     JsonNode hitsNode = rootNode.get("hits");
503
504     /*
505      * Check the result for success / failure, and enumerate all the index ids that resulted in
506      * success, and ignore the ones that failed or log them so we have a record of the failure.
507      */
508
509     if (rootNode != null) {
510
511       if (timedOut) {
512         LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "Scroll Context", String.valueOf(tookInMs));
513       } else {
514         LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "Scroll Context", String.valueOf(tookInMs));
515       }
516
517       /*
518        * Collect all object ids
519        */
520
521       ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
522       String key = null;
523       String value = null;
524       JsonNode jsonNode = null;
525
526       Iterator<JsonNode> nodeIterator = hitsArray.iterator();
527
528       while (nodeIterator.hasNext()) {
529
530         jsonNode = nodeIterator.next();
531
532         key = getFieldValue(jsonNode, "_id");
533
534         if (key != null) {
535           objectIds.addObjectId(key);
536
537         }
538
539       }
540     }
541
542     return OperationState.OK;
543   }
544
545   /**
546    * Gets the field value.
547    *
548    * @param node the node
549    * @param fieldName the field name
550    * @return the field value
551    */
552   protected String getFieldValue(JsonNode node, String fieldName) {
553
554     JsonNode field = node.get(fieldName);
555
556     if (field != null) {
557       return field.asText();
558     }
559
560     return null;
561
562   }
563
564   /**
565    * Bulk delete.
566    *
567    * @param docIds the doc ids
568    * @return the operation result
569    * @throws IOException Signals that an I/O exception has occurred.
570    */
571   public OperationResult bulkDelete(Collection<String> docIds) throws IOException {
572
573     if (docIds == null || docIds.size() == 0) {
574       LOG.info(AaiUiMsgs.ES_BULK_DELETE_SKIP);
575       return new OperationResult(500,
576           "Skipping bulkDelete(); operation because docs to delete list is empty");
577     }
578
579     LOG.info(AaiUiMsgs.ES_BULK_DELETE_START, String.valueOf(docIds.size()));
580
581     StringBuilder sb = new StringBuilder(128);
582
583     for (String id : docIds) {
584       sb.append(String.format(BULK_OP_LINE_TEMPLATE,
585           buildDeleteDataObject(schemaConfig.getIndexName(), schemaConfig.getIndexDocType(), id)));
586     }
587
588     sb.append("\n");
589
590     final String fullUrlStr = getFullUrl("/_bulk");
591
592     return esAdapter.doPost(fullUrlStr, sb.toString(), MediaType.APPLICATION_FORM_URLENCODED_TYPE);
593
594   }
595
596   /*
597   
598   */
599
600 }