cbcc5f864b92eb446f5f921a9b43a88521a0116e
[dcaegen2/services.git] /
1 /*
2 * ============LICENSE_START=======================================================
3 * ONAP : DATALAKE
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *     http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 */
20
21 package org.onap.datalake.feeder.service;
22
23 import java.io.IOException;
24 import java.util.List;
25
26 import javax.annotation.PostConstruct;
27 import javax.annotation.PreDestroy;
28
29 import org.apache.http.HttpHost;
30 import org.elasticsearch.ElasticsearchStatusException;
31 import org.elasticsearch.action.ActionListener;
32 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
33 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
34 import org.elasticsearch.action.bulk.BulkRequest;
35 import org.elasticsearch.action.bulk.BulkResponse;
36 import org.elasticsearch.action.index.IndexRequest;
37 import org.elasticsearch.client.RequestOptions;
38 import org.elasticsearch.client.RestClient;
39 import org.elasticsearch.client.RestHighLevelClient;
40 import org.elasticsearch.common.xcontent.XContentType;
41 import org.json.JSONObject;
42 import org.onap.datalake.feeder.config.ApplicationConfiguration;
43 import org.onap.datalake.feeder.domain.Topic;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 import org.springframework.beans.factory.annotation.Autowired;
48 import org.springframework.stereotype.Service;
49
50 /**
51  * Service to use Elasticsearch
52  * 
53  * @author Guobiao Mo
54  *
55  */
56 @Service
57 public class ElasticsearchService {
58
59         private final Logger log = LoggerFactory.getLogger(this.getClass());
60
61         @Autowired
62         private ApplicationConfiguration config;
63
64         private RestHighLevelClient client;
65         ActionListener<BulkResponse> listener;
66
67         @PostConstruct
68         private void init() {
69                 String elasticsearchHost = config.getElasticsearchHost();
70
71                 // Initialize the Connection
72                 client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
73
74                 log.info("Connect to Elasticsearch Host " + elasticsearchHost);
75
76                 listener = new ActionListener<BulkResponse>() {
77                         @Override
78                         public void onResponse(BulkResponse bulkResponse) {
79
80                         }
81
82                         @Override
83                         public void onFailure(Exception e) {
84                                 log.error(e.getMessage());
85                         }
86                 };
87         }
88
89         @PreDestroy
90         public void cleanUp() throws IOException {
91                 client.close();
92         }
93         
94         public void ensureTableExist(String topic) throws IOException {
95                 String topicLower = topic.toLowerCase();
96
97                 CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
98                 try {
99                         CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);          
100                         log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged());
101                 }catch(ElasticsearchStatusException e) {
102                         log.info("{} create ES topic status: {}", topic, e.getDetailedMessage());                       
103                 }
104         }
105         
106         //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
107         public void saveJsons(Topic topic, List<JSONObject> jsons) {
108                 BulkRequest request = new BulkRequest();
109
110                 for (JSONObject json : jsons) {
111                         if(topic.isCorrelateClearedMessage()) {
112                                 boolean found = correlateClearedMessage(json);
113                                 if(found) {
114                                         continue;
115                                 }
116                         }
117                         request.add(new IndexRequest(topic.getId().toLowerCase(), "doc").source(json.toString(), XContentType.JSON));
118                 }
119                 if(config.isAsync()) {
120                         client.bulkAsync(request, RequestOptions.DEFAULT, listener);                    
121                 }else {
122                         try {
123                                 client.bulk(request, RequestOptions.DEFAULT);
124                         } catch (IOException e) { 
125                                 log.error( topic.getId() , e);
126                         }
127                 }
128         }
129         
130         private boolean correlateClearedMessage(JSONObject json) {
131                 boolean found = false;
132                 
133                 /*TODO
134                  * 1. check if this is a alarm cleared message
135                  * 2. search previous alarm message
136                  * 3. update previous message, if success, set found=true
137                  */
138                 
139                 return found; 
140         }
141
142 }