Add lombok support to simple classes
[sdc.git] / catalog-dao / src / main / java / org / openecomp / sdc / be / dao / titan / TitanGraphClient.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.dao.titan;
22
23 import com.thinkaurelius.titan.core.*;
24 import com.thinkaurelius.titan.core.schema.ConsistencyModifier;
25 import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
26 import com.thinkaurelius.titan.core.schema.TitanManagement;
27 import com.thinkaurelius.titan.core.util.TitanCleanup;
28 import com.thinkaurelius.titan.diskstorage.ResourceUnavailableException;
29 import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException;
30 import com.thinkaurelius.titan.graphdb.database.idassigner.IDPoolExhaustedException;
31 import fj.data.Either;
32 import org.apache.commons.configuration.BaseConfiguration;
33 import org.apache.tinkerpop.gremlin.structure.T;
34 import org.apache.tinkerpop.gremlin.structure.Vertex;
35 import org.openecomp.sdc.be.config.BeEcompErrorManager;
36 import org.openecomp.sdc.be.config.ConfigurationManager;
37 import org.openecomp.sdc.be.dao.DAOTitanStrategy;
38 import org.openecomp.sdc.be.dao.TitanClientStrategy;
39 import org.openecomp.sdc.be.dao.neo4j.GraphPropertiesDictionary;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import org.springframework.stereotype.Component;
43
44 import javax.annotation.PostConstruct;
45 import java.util.ArrayList;
46 import java.util.HashMap;
47 import java.util.concurrent.*;
48
49
50 @Component("titan-client")
51 public class TitanGraphClient {
52         private static Logger logger = LoggerFactory.getLogger(TitanGraphClient.class.getName());
53         private static Logger healthLogger = LoggerFactory.getLogger("titan.healthcheck");
54
55         private static final String HEALTH_CHECK = GraphPropertiesDictionary.HEALTH_CHECK.getProperty();
56         private static final String OK = "GOOD";
57
58         public TitanGraphClient() {
59         }
60
61         private class HealthCheckTask implements Callable<Vertex> {
62                 @Override
63                 public Vertex call() {
64
65                         TitanVertex v = (TitanVertex) graph.query().has(HEALTH_CHECK, OK).vertices().iterator().next();
66                         TitanVertexProperty<String> property = v.property("healthcheck", OK + "_" + System.currentTimeMillis());
67                         healthLogger.trace("Health Check Node Found...{}", v.property(HEALTH_CHECK));
68                         graph.tx().commit();
69
70                         return v;
71                 }
72         }
73
74         private class HealthCheckScheduledTask implements Runnable {
75                 @Override
76                 public void run() {
77                         healthLogger.trace("Executing TITAN Health Check Task - Start");
78                         boolean healthStatus = isGraphOpen();
79                         healthLogger.trace("Executing TITAN Health Check Task - Status = {}", healthStatus);
80                         if (healthStatus != lastHealthState) {
81                                 logger.trace("TITAN  Health State Changed to {}. Issuing alarm / recovery alarm...", healthStatus);
82                                 lastHealthState = healthStatus;
83                                 logAlarm();
84                         }
85                 }
86         }
87
88         private class ReconnectTask implements Runnable {
89                 @Override
90                 public void run() {
91                         logger.trace("Trying to reconnect to Titan...");
92                         if (graph == null) {
93                                 createGraph(titanCfgFile);
94                         }
95                 }
96         }
97
98         private TitanGraph graph;
99
100         // Health Check Variables
101
102         /**
103          * This executor will execute the health check task on a callable task that can be executed with a timeout.
104          */
105         ExecutorService healthCheckExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
106                 @Override
107                 public Thread newThread(Runnable r) {
108                         return new Thread(r, "Titan-Health-Check-Thread");
109                 }
110         });
111         private long healthCheckReadTimeout = 2;
112         HealthCheckTask healthCallableTask = new HealthCheckTask();
113         HealthCheckScheduledTask healthCheckScheduledTask = new HealthCheckScheduledTask();
114         boolean lastHealthState = false;
115
116         // Reconnection variables
117         private ScheduledExecutorService reconnectScheduler = null;
118         private ScheduledExecutorService healthCheckScheduler = null;
119         private Runnable reconnectTask = null;
120         private long reconnectInterval = 3;
121         @SuppressWarnings("rawtypes")
122         private Future reconnectFuture;
123
124         private String titanCfgFile = null;
125         TitanClientStrategy titanClientStrategy;
126
127         public TitanGraphClient(TitanClientStrategy titanClientStrategy) {
128                 super();
129                 this.titanClientStrategy = titanClientStrategy;
130
131                 // Initialize a single threaded scheduler for health-check
132                 this.healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
133                         @Override
134                         public Thread newThread(Runnable r) {
135                                 return new Thread(r, "Titan-Health-Check-Task");
136                         }
137                 });
138
139                 healthCheckReadTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getTitanHealthCheckReadTimeout(2);
140                 reconnectInterval = ConfigurationManager.getConfigurationManager().getConfiguration().getTitanReconnectIntervalInSeconds(3);
141
142                 logger.info("** TitanGraphClient created");
143         }
144
145         @PostConstruct
146         public TitanOperationStatus createGraph() {
147
148                 logger.info("** createGraph started **");
149
150                 if (ConfigurationManager.getConfigurationManager().getConfiguration().getTitanInMemoryGraph()) {
151                         BaseConfiguration conf = new BaseConfiguration();
152                         conf.setProperty("storage.backend", "inmemory");
153                         graph = TitanFactory.open(conf);
154             createTitanSchema(); 
155                         logger.info("** in memory graph created");
156                         return TitanOperationStatus.OK;
157                 } else {
158                         this.titanCfgFile = titanClientStrategy.getConfigFile();
159                         if (titanCfgFile == null || titanCfgFile.isEmpty()) {
160                                 titanCfgFile = "config/titan.properties";
161                         }
162
163                         // yavivi
164                         // In case connection failed on init time, schedule a reconnect task
165                         // in the BG
166                         TitanOperationStatus status = createGraph(titanCfgFile);
167                         logger.debug("Create Titan graph status {}", status);
168                         if (status != TitanOperationStatus.OK) {
169                                 this.startReconnectTask();
170                         }
171
172                         return status;
173                 }
174         }
175
176         private void startHealthCheckTask() {
177                 this.healthCheckScheduler.scheduleAtFixedRate(healthCheckScheduledTask, 0, reconnectInterval, TimeUnit.SECONDS);
178         }
179
180         /**
181          * This method will be invoked ONLY on init time in case Titan storage is down.
182          */
183         private void startReconnectTask() {
184                 this.reconnectTask = new ReconnectTask();
185                 // Initialize a single threaded scheduler
186                 this.reconnectScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
187                         @Override
188                         public Thread newThread(Runnable r) {
189                                 return new Thread(r, "Titan-Reconnect-Task");
190                         }
191                 });
192
193                 logger.info("Scheduling reconnect task {} with interval of {} seconds", reconnectTask, reconnectInterval);
194                 reconnectFuture = this.reconnectScheduler.scheduleAtFixedRate(this.reconnectTask, 0, this.reconnectInterval, TimeUnit.SECONDS);
195         }
196
197         public void cleanupGraph() {
198                 if (graph != null) {
199                         // graph.shutdown();
200                         graph.close();
201                         TitanCleanup.clear(graph);
202                 }
203         }
204
205         private boolean graphInitialized(){
206                 TitanManagement graphMgmt = graph.openManagement();
207                 return graphMgmt.containsPropertyKey(HEALTH_CHECK) && graphMgmt.containsGraphIndex(HEALTH_CHECK);
208         }
209         
210
211         public TitanOperationStatus createGraph(String titanCfgFile) {
212                 logger.info("** open graph with {} started", titanCfgFile);
213                 try {
214                         logger.info("openGraph : try to load file {}", titanCfgFile);
215                         graph = TitanFactory.open(titanCfgFile);
216                         if (graph.isClosed() || !graphInitialized()) {
217                                 logger.error("titan graph was not initialized");
218                                 return TitanOperationStatus.NOT_CREATED;
219                         }
220
221                 } catch (Exception e) {
222                         this.graph = null;
223                         logger.info("createGraph : failed to open Titan graph with configuration file: {}", titanCfgFile);
224                         logger.debug("createGraph : failed with exception.", e);
225                         return TitanOperationStatus.NOT_CONNECTED;
226                 }
227
228                 logger.info("** Titan graph created ");
229
230                 // Do some post creation actions
231                 this.onGraphOpened();
232
233                 return TitanOperationStatus.OK;
234         }
235
236         private void onGraphOpened() {
237                 // if a reconnect task is running, cancel it.
238                 if (this.reconnectFuture != null) {
239                         logger.info("** Cancelling Titan reconnect task");
240                         reconnectFuture.cancel(true);
241                 }
242
243                 // create health-check node
244                 if (!graph.query().has(HEALTH_CHECK, OK).vertices().iterator().hasNext()) {
245                         logger.trace("Healthcheck Singleton node does not exist, Creating healthcheck node...");
246                         Vertex healthCheckNode = graph.addVertex();
247                         healthCheckNode.property(HEALTH_CHECK, OK);
248                         logger.trace("Healthcheck node created successfully. ID={}", healthCheckNode.property(T.id.getAccessor()));
249                         graph.tx().commit();
250                 } else {
251                         logger.trace("Skipping Healthcheck Singleton node creation. Already exist...");
252                 }
253                 this.startHealthCheckTask();
254         }
255
256
257         public Either<TitanGraph, TitanOperationStatus> getGraph() {
258                 if (graph != null) {
259                         return Either.left(graph);
260                 } else {
261                         return Either.right(TitanOperationStatus.NOT_CREATED);
262                 }
263         }
264
265         public TitanOperationStatus commit() {
266                 if (graph != null) {
267                         try {
268                                 graph.tx().commit();
269                                 return TitanOperationStatus.OK;
270                         } catch (Exception e) {
271                                 return handleTitanException(e);
272                         }
273                 } else {
274                         return TitanOperationStatus.NOT_CREATED;
275                 }
276         }
277
278         public TitanOperationStatus rollback() {
279                 if (graph != null) {
280                         try {
281                                 // graph.rollback();
282                                 graph.tx().rollback();
283                                 return TitanOperationStatus.OK;
284                         } catch (Exception e) {
285                                 return handleTitanException(e);
286                         }
287                 } else {
288                         return TitanOperationStatus.NOT_CREATED;
289                 }
290         }
291
292         public static TitanOperationStatus handleTitanException(Exception e) {
293                 if (e instanceof TitanConfigurationException) {
294                         return TitanOperationStatus.TITAN_CONFIGURATION;
295                 }
296                 if (e instanceof SchemaViolationException) {
297                         return TitanOperationStatus.TITAN_SCHEMA_VIOLATION;
298                 }
299                 if (e instanceof PermanentLockingException) {
300                         return TitanOperationStatus.TITAN_SCHEMA_VIOLATION;
301                 }
302                 if (e instanceof IDPoolExhaustedException) {
303                         return TitanOperationStatus.GENERAL_ERROR;
304                 }
305                 if (e instanceof InvalidElementException) {
306                         return TitanOperationStatus.INVALID_ELEMENT;
307                 }
308                 if (e instanceof InvalidIDException) {
309                         return TitanOperationStatus.INVALID_ID;
310                 }
311                 if (e instanceof QueryException) {
312                         return TitanOperationStatus.INVALID_QUERY;
313                 }
314                 if (e instanceof ResourceUnavailableException) {
315                         return TitanOperationStatus.RESOURCE_UNAVAILABLE;
316                 }
317                 if (e instanceof IllegalArgumentException) {
318                         // TODO check the error message??
319                         return TitanOperationStatus.ILLEGAL_ARGUMENT;
320                 }
321
322                 return TitanOperationStatus.GENERAL_ERROR;
323         }
324
325         public boolean getHealth() {
326                 return this.lastHealthState;
327         }
328
329         private boolean isGraphOpen() {
330                 healthLogger.trace("Invoking Titan health check ...");
331                 Vertex v = null;
332                 if (graph != null) {
333                         try {
334                                 Future<Vertex> future = healthCheckExecutor.submit(healthCallableTask);
335                                 v = future.get(this.healthCheckReadTimeout, TimeUnit.SECONDS);
336                                 healthLogger.trace("Health Check Node Found... {}", v.property(HEALTH_CHECK));
337                                 graph.tx().commit();
338                         } catch (Exception e) {
339                                 String message = e.getMessage();
340                                 if (message == null) {
341                                         message = e.getClass().getName();
342                                 }
343                                 logger.error("Titan Health Check Failed. {}", message);
344                                 return false;
345                         }
346                         return true;
347                 } else {
348                         return false;
349                 }
350         }
351
352
353         public static void main(String[] args) throws InterruptedException {
354                 TitanGraphClient client = new TitanGraphClient(new DAOTitanStrategy());
355                 client.createGraph();
356
357                 while (true) {
358                         boolean health = client.isGraphOpen();
359                         System.err.println("health=" + health);
360                         Thread.sleep(2000);
361                 }
362
363         }
364
365
366         private static final String TITAN_HEALTH_CHECK_STR = "titanHealthCheck";
367
368         private void logAlarm() {
369                 if (lastHealthState) {
370                         BeEcompErrorManager.getInstance().logBeHealthCheckTitanRecovery(TITAN_HEALTH_CHECK_STR);
371                 } else {
372                         BeEcompErrorManager.getInstance().logBeHealthCheckTitanError(TITAN_HEALTH_CHECK_STR);
373                 }
374         }
375         
376         private void createTitanSchema() {
377                 
378                 TitanManagement graphMgt = graph.openManagement();
379                 TitanGraphIndex index = null;
380                 for (GraphPropertiesDictionary prop : GraphPropertiesDictionary.values()) {
381                         PropertyKey propKey = null;
382                         if (!graphMgt.containsPropertyKey(prop.getProperty())) {
383                                 Class<?> clazz = prop.getClazz();
384                                 if (!clazz.isAssignableFrom(ArrayList.class) && !clazz.isAssignableFrom(HashMap.class)) {
385                                         propKey = graphMgt.makePropertyKey(prop.getProperty()).dataType(prop.getClazz()).make();
386                                 }
387                         } else {
388                                 propKey = graphMgt.getPropertyKey(prop.getProperty());
389                         }
390                         if (prop.isIndexed()) {
391                                 if (!graphMgt.containsGraphIndex(prop.getProperty())) {
392                                         if (prop.isUnique()) {
393                                                 index = graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).unique().buildCompositeIndex();
394                                                 // Ensures only one name per vertex
395                                                 graphMgt.setConsistency(propKey, ConsistencyModifier.LOCK);
396                                                 // Ensures name uniqueness in the graph
397                                                 graphMgt.setConsistency(index, ConsistencyModifier.LOCK);
398
399                                         } else {
400                                                 graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).buildCompositeIndex();
401                                         }
402                                 }
403                         }
404                 }
405                 graphMgt.commit();
406         }
407
408 }