2 * ============LICENSE_START=======================================================
 
   4 * ================================================================================
 
   5 * Copyright 2019 China Mobile
 
   6 *=================================================================================
 
   7 * Licensed under the Apache License, Version 2.0 (the "License");
 
   8 * you may not use this file except in compliance with the License.
 
   9 * You may obtain a copy of the License at
 
  11 *     http://www.apache.org/licenses/LICENSE-2.0
 
  13 * Unless required by applicable law or agreed to in writing, software
 
  14 * distributed under the License is distributed on an "AS IS" BASIS,
 
  15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16 * See the License for the specific language governing permissions and
 
  17 * limitations under the License.
 
  18 * ============LICENSE_END=========================================================
 
  21 package org.onap.datalake.feeder.service;
 
  23 import java.io.IOException;
 
  24 import java.util.List;
 
  26 import javax.annotation.PostConstruct;
 
  27 import javax.annotation.PreDestroy;
 
  29 import org.apache.http.HttpHost;
 
  30 import org.elasticsearch.ElasticsearchStatusException;
 
  31 import org.elasticsearch.action.ActionListener;
 
  32 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 
  33 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 
  34 import org.elasticsearch.action.bulk.BulkRequest;
 
  35 import org.elasticsearch.action.bulk.BulkResponse;
 
  36 import org.elasticsearch.action.index.IndexRequest;
 
  37 import org.elasticsearch.client.RequestOptions;
 
  38 import org.elasticsearch.client.RestClient;
 
  39 import org.elasticsearch.client.RestHighLevelClient;
 
  40 import org.elasticsearch.common.xcontent.XContentType;
 
  41 import org.json.JSONObject;
 
  42 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 
  43 import org.onap.datalake.feeder.domain.Topic;
 
  44 import org.slf4j.Logger;
 
  45 import org.slf4j.LoggerFactory;
 
  47 import org.springframework.beans.factory.annotation.Autowired;
 
  48 import org.springframework.stereotype.Service;
 
  51  * Service to use Elasticsearch
 
  57 public class ElasticsearchService {
 
  59         private final Logger log = LoggerFactory.getLogger(this.getClass());
 
  62         private ApplicationConfiguration config;
 
  64         private RestHighLevelClient client;
 
  65         ActionListener<BulkResponse> listener;
 
  69                 String elasticsearchHost = config.getElasticsearchHost();
 
  71                 // Initialize the Connection
 
  72                 client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
 
  74                 log.info("Connect to Elasticsearch Host " + elasticsearchHost);
 
  76                 listener = new ActionListener<BulkResponse>() {
 
  78                         public void onResponse(BulkResponse bulkResponse) {
 
  83                         public void onFailure(Exception e) {
 
  84                                 log.error(e.getMessage());
 
  90         public void cleanUp() throws IOException {
 
  94         public void ensureTableExist(String topic) throws IOException {
 
  95                 String topicLower = topic.toLowerCase();
 
  97                 CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
 
  99                         CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);          
 
 100                         log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged());
 
 101                 }catch(ElasticsearchStatusException e) {
 
 102                         log.info("{} create ES topic status: {}", topic, e.getDetailedMessage());                       
 
 106         //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
 
 107         public void saveJsons(Topic topic, List<JSONObject> jsons) {
 
 108                 BulkRequest request = new BulkRequest();
 
 110                 for (JSONObject json : jsons) {
 
 111                         if(topic.isCorrelateClearedMessage()) {
 
 112                                 boolean found = correlateClearedMessage(json);
 
 117                         request.add(new IndexRequest(topic.getId().toLowerCase(), "doc").source(json.toString(), XContentType.JSON));
 
 119                 if(config.isAsync()) {
 
 120                         client.bulkAsync(request, RequestOptions.DEFAULT, listener);                    
 
 123                                 client.bulk(request, RequestOptions.DEFAULT);
 
 124                         } catch (IOException e) { 
 
 125                                 log.error( topic.getId() , e);
 
 130         private boolean correlateClearedMessage(JSONObject json) {
 
 131                 boolean found = false;
 
 134                  * 1. check if this is a alarm cleared message
 
 135                  * 2. search previous alarm message
 
 136                  * 3. update previous message, if success, set found=true