re base code
[sdc.git] / catalog-dao / src / main / java / org / openecomp / sdc / be / dao / titan / TitanGraphClient.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.dao.titan;
22
23 import com.thinkaurelius.titan.core.*;
24 import com.thinkaurelius.titan.core.schema.ConsistencyModifier;
25 import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
26 import com.thinkaurelius.titan.core.schema.TitanManagement;
27 import com.thinkaurelius.titan.core.util.TitanCleanup;
28 import com.thinkaurelius.titan.diskstorage.ResourceUnavailableException;
29 import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException;
30 import com.thinkaurelius.titan.graphdb.database.idassigner.IDPoolExhaustedException;
31 import fj.data.Either;
32 import org.apache.commons.configuration.BaseConfiguration;
33 import org.apache.tinkerpop.gremlin.structure.T;
34 import org.apache.tinkerpop.gremlin.structure.Vertex;
35 import org.openecomp.sdc.be.config.BeEcompErrorManager;
36 import org.openecomp.sdc.be.config.ConfigurationManager;
37 import org.openecomp.sdc.be.dao.DAOTitanStrategy;
38 import org.openecomp.sdc.be.dao.TitanClientStrategy;
39 import org.openecomp.sdc.be.dao.neo4j.GraphPropertiesDictionary;
40 import org.openecomp.sdc.common.log.wrappers.Logger;
41 import org.springframework.stereotype.Component;
42
43 import javax.annotation.PostConstruct;
44 import java.util.ArrayList;
45 import java.util.HashMap;
46 import java.util.concurrent.*;
47
48
49 @Component("titan-client")
50 public class TitanGraphClient {
51
52         private static Logger logger = Logger.getLogger(TitanGraphClient.class.getName());
53         private static Logger healthLogger = Logger.getLogger("titan.healthcheck");
54
55         private static final String HEALTH_CHECK = GraphPropertiesDictionary.HEALTH_CHECK.getProperty();
56         private static final String OK = "GOOD";
57
58         public TitanGraphClient() {
59         }
60
61         private class HealthCheckTask implements Callable<Vertex> {
62                 @Override
63                 public Vertex call() {
64
65                         TitanVertex v = (TitanVertex) graph.query().has(HEALTH_CHECK, OK).vertices().iterator().next();
66                         TitanVertexProperty<String> property = v.property("healthcheck", OK + "_" + System.currentTimeMillis());
67                         healthLogger.trace("Health Check Node Found...{}", v.property(HEALTH_CHECK));
68                         graph.tx().commit();
69
70                         // Vertex v = graph.getVertices(HEALTH_CHECK, OK).iterator().next();
71                         // v.setProperty("healthcheck", OK + "_" +
72                         // System.currentTimeMillis());
73                         // graph.commit();
74                         // healthLogger.trace("Health Check Node
75                         // Found..."+v.getProperty(HEALTH_CHECK) );
76                         return v;
77                 }
78         }
79
80         private class HealthCheckScheduledTask implements Runnable {
81                 @Override
82                 public void run() {
83                         healthLogger.trace("Executing TITAN Health Check Task - Start");
84                         boolean healthStatus = isGraphOpen();
85                         healthLogger.trace("Executing TITAN Health Check Task - Status = {}", healthStatus);
86                         if (healthStatus != lastHealthState) {
87                                 logger.trace("TITAN  Health State Changed to {}. Issuing alarm / recovery alarm...", healthStatus);
88                                 lastHealthState = healthStatus;
89                                 logAlarm();
90                         }
91                 }
92         }
93
94         private class ReconnectTask implements Runnable {
95                 @Override
96                 public void run() {
97                         logger.trace("Trying to reconnect to Titan...");
98                         if (graph == null) {
99                                 createGraph(titanCfgFile);
100                         }
101                 }
102         }
103
104         private TitanGraph graph;
105
106         // Health Check Variables
107
108         /**
109          * This executor will execute the health check task on a callable task that can be executed with a timeout.
110          */
111         ExecutorService healthCheckExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
112                 @Override
113                 public Thread newThread(Runnable r) {
114                         return new Thread(r, "Titan-Health-Check-Thread");
115                 }
116         });
117         private long healthCheckReadTimeout = 2;
118         HealthCheckTask healthCallableTask = new HealthCheckTask();
119         HealthCheckScheduledTask healthCheckScheduledTask = new HealthCheckScheduledTask();
120         boolean lastHealthState = false;
121
122         // Reconnection variables
123         private ScheduledExecutorService reconnectScheduler = null;
124         private ScheduledExecutorService healthCheckScheduler = null;
125         private Runnable reconnectTask = null;
126         private long reconnectInterval = 3;
127         @SuppressWarnings("rawtypes")
128         private Future reconnectFuture;
129
130         private String titanCfgFile = null;
131         TitanClientStrategy titanClientStrategy;
132
133         public TitanGraphClient(TitanClientStrategy titanClientStrategy) {
134                 super();
135                 this.titanClientStrategy = titanClientStrategy;
136
137                 // Initialize a single threaded scheduler for health-check
138                 this.healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
139                         @Override
140                         public Thread newThread(Runnable r) {
141                                 return new Thread(r, "Titan-Health-Check-Task");
142                         }
143                 });
144
145                 healthCheckReadTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getTitanHealthCheckReadTimeout(2);
146                 reconnectInterval = ConfigurationManager.getConfigurationManager().getConfiguration().getTitanReconnectIntervalInSeconds(3);
147
148                 logger.info("** TitanGraphClient created");
149         }
150
151         @PostConstruct
152         public TitanOperationStatus createGraph() {
153
154                 logger.info("** createGraph started **");
155
156                 if (ConfigurationManager.getConfigurationManager().getConfiguration().getTitanInMemoryGraph()) {
157                         BaseConfiguration conf = new BaseConfiguration();
158                         conf.setProperty("storage.backend", "inmemory");
159                         graph = TitanFactory.open(conf);
160             createTitanSchema(); 
161                         logger.info("** in memory graph created");
162                         return TitanOperationStatus.OK;
163                 } else {
164                         this.titanCfgFile = titanClientStrategy.getConfigFile();
165                         if (titanCfgFile == null || titanCfgFile.isEmpty()) {
166                                 titanCfgFile = "config/titan.properties";
167                         }
168
169                         // yavivi
170                         // In case connection failed on init time, schedule a reconnect task
171                         // in the BG
172                         TitanOperationStatus status = createGraph(titanCfgFile);
173                         logger.debug("Create Titan graph status {}", status);
174                         if (status != TitanOperationStatus.OK) {
175                                 this.startReconnectTask();
176                         }
177
178                         return status;
179                 }
180         }
181
182         private void startHealthCheckTask() {
183                 this.healthCheckScheduler.scheduleAtFixedRate(healthCheckScheduledTask, 0, reconnectInterval, TimeUnit.SECONDS);
184         }
185
186         /**
187          * This method will be invoked ONLY on init time in case Titan storage is down.
188          */
189         private void startReconnectTask() {
190                 this.reconnectTask = new ReconnectTask();
191                 // Initialize a single threaded scheduler
192                 this.reconnectScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
193                         @Override
194                         public Thread newThread(Runnable r) {
195                                 return new Thread(r, "Titan-Reconnect-Task");
196                         }
197                 });
198
199                 logger.info("Scheduling reconnect task {} with interval of {} seconds", reconnectTask, reconnectInterval);
200                 reconnectFuture = this.reconnectScheduler.scheduleAtFixedRate(this.reconnectTask, 0, this.reconnectInterval, TimeUnit.SECONDS);
201         }
202
203         public void cleanupGraph() {
204                 if (graph != null) {
205                         // graph.shutdown();
206                         graph.close();
207                         TitanCleanup.clear(graph);
208                 }
209         }
210
211         private boolean graphInitialized(){
212                 TitanManagement graphMgmt = graph.openManagement();
213                 return graphMgmt.containsPropertyKey(HEALTH_CHECK) && graphMgmt.containsGraphIndex(HEALTH_CHECK);
214         }
215         
216
217         public TitanOperationStatus createGraph(String titanCfgFile) {
218                 logger.info("** open graph with {} started", titanCfgFile);
219                 try {
220                         logger.info("openGraph : try to load file {}", titanCfgFile);
221                         graph = TitanFactory.open(titanCfgFile);
222                         if (graph.isClosed() || !graphInitialized()) {
223                                 logger.error("titan graph was not initialized");
224                                 return TitanOperationStatus.NOT_CREATED;
225                         }
226
227                 } catch (Exception e) {
228                         this.graph = null;
229                         logger.info("createGraph : failed to open Titan graph with configuration file: {}", titanCfgFile);
230                         logger.debug("createGraph : failed with exception.", e);
231                         return TitanOperationStatus.NOT_CONNECTED;
232                 }
233
234                 logger.info("** Titan graph created ");
235
236                 // Do some post creation actions
237                 this.onGraphOpened();
238
239                 return TitanOperationStatus.OK;
240         }
241
242         private void onGraphOpened() {
243                 // if a reconnect task is running, cancel it.
244                 if (this.reconnectFuture != null) {
245                         logger.info("** Cancelling Titan reconnect task");
246                         reconnectFuture.cancel(true);
247                 }
248
249                 // create health-check node
250                 if (!graph.query().has(HEALTH_CHECK, OK).vertices().iterator().hasNext()) {
251                         logger.trace("Healthcheck Singleton node does not exist, Creating healthcheck node...");
252                         Vertex healthCheckNode = graph.addVertex();
253                         healthCheckNode.property(HEALTH_CHECK, OK);
254                         logger.trace("Healthcheck node created successfully. ID={}", healthCheckNode.property(T.id.getAccessor()));
255                         graph.tx().commit();
256                 } else {
257                         logger.trace("Skipping Healthcheck Singleton node creation. Already exist...");
258                 }
259                 this.startHealthCheckTask();
260         }
261
262
263         public Either<TitanGraph, TitanOperationStatus> getGraph() {
264                 if (graph != null) {
265                         return Either.left(graph);
266                 } else {
267                         return Either.right(TitanOperationStatus.NOT_CREATED);
268                 }
269         }
270
271         public TitanOperationStatus commit() {
272                 if (graph != null) {
273                         try {
274                                 graph.tx().commit();
275                                 return TitanOperationStatus.OK;
276                         } catch (Exception e) {
277                                 return handleTitanException(e);
278                         }
279                 } else {
280                         return TitanOperationStatus.NOT_CREATED;
281                 }
282         }
283
284         public TitanOperationStatus rollback() {
285                 if (graph != null) {
286                         try {
287                                 // graph.rollback();
288                                 graph.tx().rollback();
289                                 return TitanOperationStatus.OK;
290                         } catch (Exception e) {
291                                 return handleTitanException(e);
292                         }
293                 } else {
294                         return TitanOperationStatus.NOT_CREATED;
295                 }
296         }
297
298         public static TitanOperationStatus handleTitanException(Exception e) {
299                 if (e instanceof TitanConfigurationException) {
300                         return TitanOperationStatus.TITAN_CONFIGURATION;
301                 }
302                 if (e instanceof SchemaViolationException) {
303                         return TitanOperationStatus.TITAN_SCHEMA_VIOLATION;
304                 }
305                 if (e instanceof PermanentLockingException) {
306                         return TitanOperationStatus.TITAN_SCHEMA_VIOLATION;
307                 }
308                 if (e instanceof IDPoolExhaustedException) {
309                         return TitanOperationStatus.GENERAL_ERROR;
310                 }
311                 if (e instanceof InvalidElementException) {
312                         return TitanOperationStatus.INVALID_ELEMENT;
313                 }
314                 if (e instanceof InvalidIDException) {
315                         return TitanOperationStatus.INVALID_ID;
316                 }
317                 if (e instanceof QueryException) {
318                         return TitanOperationStatus.INVALID_QUERY;
319                 }
320                 if (e instanceof ResourceUnavailableException) {
321                         return TitanOperationStatus.RESOURCE_UNAVAILABLE;
322                 }
323                 if (e instanceof IllegalArgumentException) {
324                         // TODO check the error message??
325                         return TitanOperationStatus.ILLEGAL_ARGUMENT;
326                 }
327
328                 return TitanOperationStatus.GENERAL_ERROR;
329         }
330
331         public boolean getHealth() {
332                 return this.lastHealthState;
333         }
334
335         private boolean isGraphOpen() {
336                 healthLogger.trace("Invoking Titan health check ...");
337                 Vertex v = null;
338                 if (graph != null) {
339                         try {
340                                 Future<Vertex> future = healthCheckExecutor.submit(healthCallableTask);
341                                 v = future.get(this.healthCheckReadTimeout, TimeUnit.SECONDS);
342                                 healthLogger.trace("Health Check Node Found... {}", v.property(HEALTH_CHECK));
343                                 graph.tx().commit();
344                         } catch (Exception e) {
345                                 String message = e.getMessage();
346                                 if (message == null) {
347                                         message = e.getClass().getName();
348                                 }
349                                 logger.error("Titan Health Check Failed. {}", message);
350                                 return false;
351                         }
352                         return true;
353                 } else {
354                         return false;
355                 }
356         }
357
358
359         public static void main(String[] args) throws InterruptedException {
360                 TitanGraphClient client = new TitanGraphClient(new DAOTitanStrategy());
361                 client.createGraph();
362
363                 while (true) {
364                         boolean health = client.isGraphOpen();
365                         System.err.println("health=" + health);
366                         Thread.sleep(2000);
367                 }
368
369         }
370
371
372         private static final String TITAN_HEALTH_CHECK_STR = "titanHealthCheck";
373
374         private void logAlarm() {
375                 if (lastHealthState) {
376                         BeEcompErrorManager.getInstance().logBeHealthCheckTitanRecovery(TITAN_HEALTH_CHECK_STR);
377                 } else {
378                         BeEcompErrorManager.getInstance().logBeHealthCheckTitanError(TITAN_HEALTH_CHECK_STR);
379                 }
380         }
381         
382         private void createTitanSchema() {
383                 
384                 TitanManagement graphMgt = graph.openManagement();
385                 TitanGraphIndex index = null;
386                 for (GraphPropertiesDictionary prop : GraphPropertiesDictionary.values()) {
387                         PropertyKey propKey = null;
388                         if (!graphMgt.containsPropertyKey(prop.getProperty())) {
389                                 Class<?> clazz = prop.getClazz();
390                                 if (!ArrayList.class.getName().equals(clazz.getName()) && !HashMap.class.getName().equals(clazz.getName())) {
391                                         propKey = graphMgt.makePropertyKey(prop.getProperty()).dataType(prop.getClazz()).make();
392                                 }
393                         } else {
394                                 propKey = graphMgt.getPropertyKey(prop.getProperty());
395                         }
396                         if (prop.isIndexed()) {
397                                 if (!graphMgt.containsGraphIndex(prop.getProperty())) {
398                                         if (prop.isUnique()) {
399                                                 index = graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).unique().buildCompositeIndex();
400
401                                                 graphMgt.setConsistency(propKey, ConsistencyModifier.LOCK); // Ensures
402                                                                                                                                                                         // only
403                                                                                                                                                                         // one
404                                                                                                                                                                         // name
405                                                                                                                                                                         // per
406                                                                                                                                                                         // vertex
407                                                 graphMgt.setConsistency(index, ConsistencyModifier.LOCK); // Ensures
408                                                                                                                                                                         // name
409                                                                                                                                                                         // uniqueness
410                                                                                                                                                                         // in
411                                                                                                                                                                         // the
412                                                                                                                                                                         // graph
413
414                                         } else {
415                                                 graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).buildCompositeIndex();
416                                         }
417                                 }
418                         }
419                 }
420                 graphMgt.commit();
421         }
422
423 }