import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.client.indices.CreateIndexRequest;
 import org.elasticsearch.client.indices.CreateIndexResponse;
 import org.elasticsearch.client.indices.GetIndexRequest; 
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.rest.RestStatus;
 import org.json.JSONObject;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Db;
 
                for (JSONObject json : jsons) {
                        if(topic.isCorrelateClearedMessage()) {
-                               boolean found = correlateClearedMessage(json);
+                               boolean found = correlateClearedMessage(topic, json);
                                if(found) {
                                        continue;
                                }
                        }
                        
                        String id = topic.getMessageId(json); //id can be null
-                       
-                       request.add(new IndexRequest(topic.getName().toLowerCase(), "doc", id).source(json.toString(), XContentType.JSON));
+
+                       request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
                }
                if(config.isAsync()) {
                        client.bulkAsync(request, RequestOptions.DEFAULT, listener);                    
                        }
                }
        }
-       
-       private boolean correlateClearedMessage(JSONObject json) {
-               boolean found = true;
-                               
-               /*TODO
-                * 1. check if this is a alarm cleared message
-                * 2. search previous alarm message
-                * 3. update previous message, if success, set found=true
-                */
-               //for Sonar test, remove the following
-               if(json.isNull("kkkkk")) {
-                       found = false;
+
+       /**
+        *
+        * @param topic
+        * @param json
+        * @return boolean
+        *
+        * Because of query by id, The search API cannot be used for query.
+        * The search API can only query all data or based on the fields in the source.
+        * So use the get API, three parameters: index, type, document id
+        */
+       private boolean correlateClearedMessage(Topic topic, JSONObject json) {
+               boolean found = false;
+               String eName = null;
+
+               try {
+                       eName = json.query("/event/commonEventHeader/eventName").toString();
+
+                       if (StringUtils.isNotBlank(eName)) {
+
+                               if (eName.endsWith("Cleared")) {
+
+                                       String name = eName.substring(0, eName.length() - 7);
+                                       String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString();
+                                       String specificProblem = json.query("/event/faultFields/specificProblem").toString();
+
+                                       String id = null;
+                                       StringBuilder stringBuilder = new StringBuilder();
+                                       stringBuilder = stringBuilder.append(name).append('^').append(reportingEntityName).append('^').append(specificProblem);
+
+                                       id = stringBuilder.toString();//example: id = "aaaa^cccc^bbbbb"
+                                       String index = topic.getName().toLowerCase();
+
+                                       //get
+                                       GetRequest getRequest = new GetRequest(index, config.getElasticsearchType(), id);
+
+                                       GetResponse getResponse = null;
+                                       try {
+                                               getResponse = client.get(getRequest, RequestOptions.DEFAULT);
+                                               if (getResponse != null) {
+
+                                                       if (getResponse.isExists()) {
+                                                               String sourceAsString = getResponse.getSourceAsString();
+                                                               JSONObject jsonObject = new JSONObject(sourceAsString);
+                                                               jsonObject.getJSONObject("event").getJSONObject("faultFields").put("vfStatus", "closed");
+                                                               String jsonString = jsonObject.toString();
+
+                                                               //update
+                                                               IndexRequest request = new IndexRequest(index, config.getElasticsearchType(), id);
+                                                               request.source(jsonString, XContentType.JSON);
+                                                               IndexResponse indexResponse = null;
+                                                               try {
+                                                                       indexResponse = client.index(request, RequestOptions.DEFAULT);
+                                                                       found = true;
+                                                               } catch (IOException e) {
+                                                                       log.error("save failure");
+                                                               }
+                                                       } else {
+                                                               log.error("The getResponse was not exists" );
+                                                       }
+
+                                               } else {
+                                                       log.error("The document for this id was not found" );
+                                               }
+
+                                       } catch (ElasticsearchException e) {
+                                               if (e.status() == RestStatus.NOT_FOUND) {
+                                                       log.error("The document for this id was not found" );
+                                               }
+                                               if (e.status() == RestStatus.CONFLICT) {
+                                                       log.error("Version conflict" );
+                                               }
+                                               log.error("Get document exception", e);
+                                       }catch (IOException e) {
+                                               log.error(topic.getName() , e);
+                                       }
+
+                               } else {
+                                       log.info("The data is normal");
+                               }
+
+                       } else {
+                               log.debug("event id null");
+                       }
+
+               } catch (Exception e) {
+                       log.error("error",e);
                }
-               
-               return found; 
+
+               return found;
        }
 
 }