2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.dao.titan;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.TimeUnit;
33 import javax.annotation.PostConstruct;
35 import org.apache.commons.configuration.BaseConfiguration;
36 import org.apache.tinkerpop.gremlin.structure.T;
37 import org.apache.tinkerpop.gremlin.structure.Vertex;
38 import org.openecomp.sdc.be.config.BeEcompErrorManager;
39 import org.openecomp.sdc.be.config.ConfigurationManager;
40 import org.openecomp.sdc.be.dao.DAOTitanStrategy;
41 import org.openecomp.sdc.be.dao.TitanClientStrategy;
42 import org.openecomp.sdc.be.dao.neo4j.GraphPropertiesDictionary;
43 import org.openecomp.sdc.common.config.EcompErrorName;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 import org.springframework.stereotype.Component;
48 import com.thinkaurelius.titan.core.InvalidElementException;
49 import com.thinkaurelius.titan.core.InvalidIDException;
50 import com.thinkaurelius.titan.core.PropertyKey;
51 import com.thinkaurelius.titan.core.QueryException;
52 import com.thinkaurelius.titan.core.SchemaViolationException;
53 import com.thinkaurelius.titan.core.TitanConfigurationException;
54 import com.thinkaurelius.titan.core.TitanFactory;
55 import com.thinkaurelius.titan.core.TitanGraph;
56 import com.thinkaurelius.titan.core.TitanVertex;
57 import com.thinkaurelius.titan.core.TitanVertexProperty;
58 import com.thinkaurelius.titan.core.schema.ConsistencyModifier;
59 import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
60 import com.thinkaurelius.titan.core.schema.TitanManagement;
61 import com.thinkaurelius.titan.core.util.TitanCleanup;
62 import com.thinkaurelius.titan.diskstorage.ResourceUnavailableException;
63 import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException;
64 import com.thinkaurelius.titan.graphdb.database.idassigner.IDPoolExhaustedException;
66 import fj.data.Either;
69 @Component("titan-client")
70 public class TitanGraphClient {
72 private static Logger logger = LoggerFactory.getLogger(TitanGraphClient.class.getName());
73 private static Logger healthLogger = LoggerFactory.getLogger("titan.healthcheck");
75 private static final String HEALTH_CHECK = GraphPropertiesDictionary.HEALTH_CHECK.getProperty();
76 private static final String OK = "GOOD";
78 public TitanGraphClient() {
81 private class HealthCheckTask implements Callable<Vertex> {
83 public Vertex call() {
85 TitanVertex v = (TitanVertex) graph.query().has(HEALTH_CHECK, OK).vertices().iterator().next();
86 TitanVertexProperty<String> property = v.property("healthcheck", OK + "_" + System.currentTimeMillis());
87 healthLogger.trace("Health Check Node Found...{}", v.property(HEALTH_CHECK));
90 // Vertex v = graph.getVertices(HEALTH_CHECK, OK).iterator().next();
91 // v.setProperty("healthcheck", OK + "_" +
92 // System.currentTimeMillis());
94 // healthLogger.trace("Health Check Node
95 // Found..."+v.getProperty(HEALTH_CHECK) );
100 private class HealthCheckScheduledTask implements Runnable {
103 healthLogger.trace("Executing TITAN Health Check Task - Start");
104 boolean healthStatus = isGraphOpen();
105 healthLogger.trace("Executing TITAN Health Check Task - Status = {}", healthStatus);
106 if (healthStatus != lastHealthState) {
107 logger.trace("TITAN Health State Changed to {}. Issuing alarm / recovery alarm...", healthStatus);
108 lastHealthState = healthStatus;
114 private class ReconnectTask implements Runnable {
117 logger.trace("Trying to reconnect to Titan...");
119 createGraph(titanCfgFile);
124 private TitanGraph graph;
126 // Health Check Variables
129 * This executor will execute the health check task on a callable task that can be executed with a timeout.
131 ExecutorService healthCheckExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
133 public Thread newThread(Runnable r) {
134 return new Thread(r, "Titan-Health-Check-Thread");
137 private long healthCheckReadTimeout = 2;
138 HealthCheckTask healthCallableTask = new HealthCheckTask();
139 HealthCheckScheduledTask healthCheckScheduledTask = new HealthCheckScheduledTask();
140 boolean lastHealthState = false;
142 // Reconnection variables
143 private ScheduledExecutorService reconnectScheduler = null;
144 private ScheduledExecutorService healthCheckScheduler = null;
145 private Runnable reconnectTask = null;
146 private long reconnectInterval = 3;
147 @SuppressWarnings("rawtypes")
148 private Future reconnectFuture;
150 private String titanCfgFile = null;
151 TitanClientStrategy titanClientStrategy;
153 public TitanGraphClient(TitanClientStrategy titanClientStrategy) {
155 this.titanClientStrategy = titanClientStrategy;
157 // Initialize a single threaded scheduler for health-check
158 this.healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
160 public Thread newThread(Runnable r) {
161 return new Thread(r, "Titan-Health-Check-Task");
165 healthCheckReadTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getTitanHealthCheckReadTimeout(2);
166 reconnectInterval = ConfigurationManager.getConfigurationManager().getConfiguration().getTitanReconnectIntervalInSeconds(3);
168 logger.info("** TitanGraphClient created");
172 public TitanOperationStatus createGraph() {
174 logger.info("** createGraph started **");
176 if (ConfigurationManager.getConfigurationManager().getConfiguration().getTitanInMemoryGraph()) {
177 BaseConfiguration conf = new BaseConfiguration();
178 conf.setProperty("storage.backend", "inmemory");
179 graph = TitanFactory.open(conf);
181 logger.info("** in memory graph created");
182 return TitanOperationStatus.OK;
184 this.titanCfgFile = titanClientStrategy.getConfigFile();
185 if (titanCfgFile == null || titanCfgFile.isEmpty()) {
186 titanCfgFile = "config/titan.properties";
190 // In case connection failed on init time, schedule a reconnect task
192 TitanOperationStatus status = createGraph(titanCfgFile);
193 logger.debug("Create Titan graph status {}", status);
194 if (status != TitanOperationStatus.OK) {
195 this.startReconnectTask();
202 private void startHealthCheckTask() {
203 this.healthCheckScheduler.scheduleAtFixedRate(healthCheckScheduledTask, 0, reconnectInterval, TimeUnit.SECONDS);
207 * This method will be invoked ONLY on init time in case Titan storage is down.
209 private void startReconnectTask() {
210 this.reconnectTask = new ReconnectTask();
211 // Initialize a single threaded scheduler
212 this.reconnectScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
214 public Thread newThread(Runnable r) {
215 return new Thread(r, "Titan-Reconnect-Task");
219 logger.info("Scheduling reconnect task {} with interval of {} seconds", reconnectTask, reconnectInterval);
220 reconnectFuture = this.reconnectScheduler.scheduleAtFixedRate(this.reconnectTask, 0, this.reconnectInterval, TimeUnit.SECONDS);
223 public void cleanupGraph() {
227 TitanCleanup.clear(graph);
231 private boolean graphInitialized(){
232 TitanManagement graphMgmt = graph.openManagement();
233 return graphMgmt.containsPropertyKey(HEALTH_CHECK) && graphMgmt.containsGraphIndex(HEALTH_CHECK);
237 public TitanOperationStatus createGraph(String titanCfgFile) {
238 logger.info("** open graph with {} started", titanCfgFile);
240 logger.info("openGraph : try to load file {}", titanCfgFile);
241 graph = TitanFactory.open(titanCfgFile);
242 if (graph.isClosed() || !graphInitialized()) {
243 logger.error("titan graph was not initialized");
244 return TitanOperationStatus.NOT_CREATED;
247 } catch (Exception e) {
249 logger.info("createGraph : failed to open Titan graph with configuration file: {}", titanCfgFile, e);
250 return TitanOperationStatus.NOT_CONNECTED;
253 logger.info("** Titan graph created ");
255 // Do some post creation actions
256 this.onGraphOpened();
258 return TitanOperationStatus.OK;
261 private void onGraphOpened() {
262 // if a reconnect task is running, cancel it.
263 if (this.reconnectFuture != null) {
264 logger.info("** Cancelling Titan reconnect task");
265 reconnectFuture.cancel(true);
268 // create health-check node
269 if (!graph.query().has(HEALTH_CHECK, OK).vertices().iterator().hasNext()) {
270 logger.trace("Healthcheck Singleton node does not exist, Creating healthcheck node...");
271 Vertex healthCheckNode = graph.addVertex();
272 healthCheckNode.property(HEALTH_CHECK, OK);
273 logger.trace("Healthcheck node created successfully. ID={}", healthCheckNode.property(T.id.getAccessor()));
276 logger.trace("Skipping Healthcheck Singleton node creation. Already exist...");
278 this.startHealthCheckTask();
282 public Either<TitanGraph, TitanOperationStatus> getGraph() {
284 return Either.left(graph);
286 return Either.right(TitanOperationStatus.NOT_CREATED);
290 public TitanOperationStatus commit() {
294 return TitanOperationStatus.OK;
295 } catch (Exception e) {
296 return handleTitanException(e);
299 return TitanOperationStatus.NOT_CREATED;
303 public TitanOperationStatus rollback() {
307 graph.tx().rollback();
308 return TitanOperationStatus.OK;
309 } catch (Exception e) {
310 return handleTitanException(e);
313 return TitanOperationStatus.NOT_CREATED;
317 public static TitanOperationStatus handleTitanException(Exception e) {
318 if (e instanceof TitanConfigurationException) {
319 return TitanOperationStatus.TITAN_CONFIGURATION;
321 if (e instanceof SchemaViolationException) {
322 return TitanOperationStatus.TITAN_SCHEMA_VIOLATION;
324 if (e instanceof PermanentLockingException) {
325 return TitanOperationStatus.TITAN_SCHEMA_VIOLATION;
327 if (e instanceof IDPoolExhaustedException) {
328 return TitanOperationStatus.GENERAL_ERROR;
330 if (e instanceof InvalidElementException) {
331 return TitanOperationStatus.INVALID_ELEMENT;
333 if (e instanceof InvalidIDException) {
334 return TitanOperationStatus.INVALID_ID;
336 if (e instanceof QueryException) {
337 return TitanOperationStatus.INVALID_QUERY;
339 if (e instanceof ResourceUnavailableException) {
340 return TitanOperationStatus.RESOURCE_UNAVAILABLE;
342 if (e instanceof IllegalArgumentException) {
343 // TODO check the error message??
344 return TitanOperationStatus.ILLEGAL_ARGUMENT;
347 return TitanOperationStatus.GENERAL_ERROR;
350 public boolean getHealth() {
351 return this.lastHealthState;
354 private boolean isGraphOpen() {
355 healthLogger.trace("Invoking Titan health check ...");
359 Future<Vertex> future = healthCheckExecutor.submit(healthCallableTask);
360 v = future.get(this.healthCheckReadTimeout, TimeUnit.SECONDS);
361 healthLogger.trace("Health Check Node Found... {}", v.property(HEALTH_CHECK));
363 } catch (Exception e) {
364 String message = e.getMessage();
365 if (message == null) {
366 message = e.getClass().getName();
368 logger.error("Titan Health Check Failed. {}", message);
378 public static void main(String[] args) throws InterruptedException {
379 TitanGraphClient client = new TitanGraphClient(new DAOTitanStrategy());
380 client.createGraph();
383 boolean health = client.isGraphOpen();
384 System.err.println("health=" + health);
391 private static final String TITAN_HEALTH_CHECK_STR = "titanHealthCheck";
393 private void logAlarm() {
394 if (lastHealthState == true) {
395 BeEcompErrorManager.getInstance().logBeHealthCheckTitanRecovery(TITAN_HEALTH_CHECK_STR);
397 BeEcompErrorManager.getInstance().logBeHealthCheckTitanError(TITAN_HEALTH_CHECK_STR);
401 private void createTitanSchema() {
403 TitanManagement graphMgt = graph.openManagement();
404 TitanGraphIndex index = null;
405 for (GraphPropertiesDictionary prop : GraphPropertiesDictionary.values()) {
406 PropertyKey propKey = null;
407 if (!graphMgt.containsPropertyKey(prop.getProperty())) {
408 Class<?> clazz = prop.getClazz();
409 if (!ArrayList.class.getName().equals(clazz.getName()) && !HashMap.class.getName().equals(clazz.getName())) {
410 propKey = graphMgt.makePropertyKey(prop.getProperty()).dataType(prop.getClazz()).make();
413 propKey = graphMgt.getPropertyKey(prop.getProperty());
415 if (prop.isIndexed()) {
416 if (!graphMgt.containsGraphIndex(prop.getProperty())) {
417 if (prop.isUnique()) {
418 index = graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).unique().buildCompositeIndex();
420 graphMgt.setConsistency(propKey, ConsistencyModifier.LOCK); // Ensures
426 graphMgt.setConsistency(index, ConsistencyModifier.LOCK); // Ensures
434 graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).buildCompositeIndex();