2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.dao.titan;
23 import com.thinkaurelius.titan.core.*;
24 import com.thinkaurelius.titan.core.schema.ConsistencyModifier;
25 import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
26 import com.thinkaurelius.titan.core.schema.TitanManagement;
27 import com.thinkaurelius.titan.core.util.TitanCleanup;
28 import com.thinkaurelius.titan.diskstorage.ResourceUnavailableException;
29 import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException;
30 import com.thinkaurelius.titan.graphdb.database.idassigner.IDPoolExhaustedException;
31 import fj.data.Either;
32 import org.apache.commons.configuration.BaseConfiguration;
33 import org.apache.tinkerpop.gremlin.structure.T;
34 import org.apache.tinkerpop.gremlin.structure.Vertex;
35 import org.openecomp.sdc.be.config.BeEcompErrorManager;
36 import org.openecomp.sdc.be.config.ConfigurationManager;
37 import org.openecomp.sdc.be.dao.DAOTitanStrategy;
38 import org.openecomp.sdc.be.dao.TitanClientStrategy;
39 import org.openecomp.sdc.be.dao.neo4j.GraphPropertiesDictionary;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import org.springframework.stereotype.Component;
44 import javax.annotation.PostConstruct;
45 import java.util.ArrayList;
46 import java.util.HashMap;
47 import java.util.concurrent.*;
50 @Component("titan-client")
51 public class TitanGraphClient {
52 private static Logger logger = LoggerFactory.getLogger(TitanGraphClient.class.getName());
53 private static Logger healthLogger = LoggerFactory.getLogger("titan.healthcheck");
55 private static final String HEALTH_CHECK = GraphPropertiesDictionary.HEALTH_CHECK.getProperty();
56 private static final String OK = "GOOD";
58 public TitanGraphClient() {
61 private class HealthCheckTask implements Callable<Vertex> {
63 public Vertex call() {
65 TitanVertex v = (TitanVertex) graph.query().has(HEALTH_CHECK, OK).vertices().iterator().next();
66 TitanVertexProperty<String> property = v.property("healthcheck", OK + "_" + System.currentTimeMillis());
67 healthLogger.trace("Health Check Node Found...{}", v.property(HEALTH_CHECK));
74 private class HealthCheckScheduledTask implements Runnable {
77 healthLogger.trace("Executing TITAN Health Check Task - Start");
78 boolean healthStatus = isGraphOpen();
79 healthLogger.trace("Executing TITAN Health Check Task - Status = {}", healthStatus);
80 if (healthStatus != lastHealthState) {
81 logger.trace("TITAN Health State Changed to {}. Issuing alarm / recovery alarm...", healthStatus);
82 lastHealthState = healthStatus;
88 private class ReconnectTask implements Runnable {
91 logger.trace("Trying to reconnect to Titan...");
93 createGraph(titanCfgFile);
98 private TitanGraph graph;
100 // Health Check Variables
103 * This executor will execute the health check task on a callable task that can be executed with a timeout.
105 ExecutorService healthCheckExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
107 public Thread newThread(Runnable r) {
108 return new Thread(r, "Titan-Health-Check-Thread");
111 private long healthCheckReadTimeout = 2;
112 HealthCheckTask healthCallableTask = new HealthCheckTask();
113 HealthCheckScheduledTask healthCheckScheduledTask = new HealthCheckScheduledTask();
114 boolean lastHealthState = false;
116 // Reconnection variables
117 private ScheduledExecutorService reconnectScheduler = null;
118 private ScheduledExecutorService healthCheckScheduler = null;
119 private Runnable reconnectTask = null;
120 private long reconnectInterval = 3;
121 @SuppressWarnings("rawtypes")
122 private Future reconnectFuture;
124 private String titanCfgFile = null;
125 TitanClientStrategy titanClientStrategy;
127 public TitanGraphClient(TitanClientStrategy titanClientStrategy) {
129 this.titanClientStrategy = titanClientStrategy;
131 // Initialize a single threaded scheduler for health-check
132 this.healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
134 public Thread newThread(Runnable r) {
135 return new Thread(r, "Titan-Health-Check-Task");
139 healthCheckReadTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getTitanHealthCheckReadTimeout(2);
140 reconnectInterval = ConfigurationManager.getConfigurationManager().getConfiguration().getTitanReconnectIntervalInSeconds(3);
142 logger.info("** TitanGraphClient created");
146 public TitanOperationStatus createGraph() {
148 logger.info("** createGraph started **");
150 if (ConfigurationManager.getConfigurationManager().getConfiguration().getTitanInMemoryGraph()) {
151 BaseConfiguration conf = new BaseConfiguration();
152 conf.setProperty("storage.backend", "inmemory");
153 graph = TitanFactory.open(conf);
155 logger.info("** in memory graph created");
156 return TitanOperationStatus.OK;
158 this.titanCfgFile = titanClientStrategy.getConfigFile();
159 if (titanCfgFile == null || titanCfgFile.isEmpty()) {
160 titanCfgFile = "config/titan.properties";
164 // In case connection failed on init time, schedule a reconnect task
166 TitanOperationStatus status = createGraph(titanCfgFile);
167 logger.debug("Create Titan graph status {}", status);
168 if (status != TitanOperationStatus.OK) {
169 this.startReconnectTask();
176 private void startHealthCheckTask() {
177 this.healthCheckScheduler.scheduleAtFixedRate(healthCheckScheduledTask, 0, reconnectInterval, TimeUnit.SECONDS);
181 * This method will be invoked ONLY on init time in case Titan storage is down.
183 private void startReconnectTask() {
184 this.reconnectTask = new ReconnectTask();
185 // Initialize a single threaded scheduler
186 this.reconnectScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
188 public Thread newThread(Runnable r) {
189 return new Thread(r, "Titan-Reconnect-Task");
193 logger.info("Scheduling reconnect task {} with interval of {} seconds", reconnectTask, reconnectInterval);
194 reconnectFuture = this.reconnectScheduler.scheduleAtFixedRate(this.reconnectTask, 0, this.reconnectInterval, TimeUnit.SECONDS);
197 public void cleanupGraph() {
201 TitanCleanup.clear(graph);
205 private boolean graphInitialized(){
206 TitanManagement graphMgmt = graph.openManagement();
207 return graphMgmt.containsPropertyKey(HEALTH_CHECK) && graphMgmt.containsGraphIndex(HEALTH_CHECK);
211 public TitanOperationStatus createGraph(String titanCfgFile) {
212 logger.info("** open graph with {} started", titanCfgFile);
214 logger.info("openGraph : try to load file {}", titanCfgFile);
215 graph = TitanFactory.open(titanCfgFile);
216 if (graph.isClosed() || !graphInitialized()) {
217 logger.error("titan graph was not initialized");
218 return TitanOperationStatus.NOT_CREATED;
221 } catch (Exception e) {
223 logger.info("createGraph : failed to open Titan graph with configuration file: {}", titanCfgFile);
224 logger.debug("createGraph : failed with exception.", e);
225 return TitanOperationStatus.NOT_CONNECTED;
228 logger.info("** Titan graph created ");
230 // Do some post creation actions
231 this.onGraphOpened();
233 return TitanOperationStatus.OK;
236 private void onGraphOpened() {
237 // if a reconnect task is running, cancel it.
238 if (this.reconnectFuture != null) {
239 logger.info("** Cancelling Titan reconnect task");
240 reconnectFuture.cancel(true);
243 // create health-check node
244 if (!graph.query().has(HEALTH_CHECK, OK).vertices().iterator().hasNext()) {
245 logger.trace("Healthcheck Singleton node does not exist, Creating healthcheck node...");
246 Vertex healthCheckNode = graph.addVertex();
247 healthCheckNode.property(HEALTH_CHECK, OK);
248 logger.trace("Healthcheck node created successfully. ID={}", healthCheckNode.property(T.id.getAccessor()));
251 logger.trace("Skipping Healthcheck Singleton node creation. Already exist...");
253 this.startHealthCheckTask();
257 public Either<TitanGraph, TitanOperationStatus> getGraph() {
259 return Either.left(graph);
261 return Either.right(TitanOperationStatus.NOT_CREATED);
265 public TitanOperationStatus commit() {
269 return TitanOperationStatus.OK;
270 } catch (Exception e) {
271 return handleTitanException(e);
274 return TitanOperationStatus.NOT_CREATED;
278 public TitanOperationStatus rollback() {
282 graph.tx().rollback();
283 return TitanOperationStatus.OK;
284 } catch (Exception e) {
285 return handleTitanException(e);
288 return TitanOperationStatus.NOT_CREATED;
292 public static TitanOperationStatus handleTitanException(Exception e) {
293 if (e instanceof TitanConfigurationException) {
294 return TitanOperationStatus.TITAN_CONFIGURATION;
296 if (e instanceof SchemaViolationException) {
297 return TitanOperationStatus.TITAN_SCHEMA_VIOLATION;
299 if (e instanceof PermanentLockingException) {
300 return TitanOperationStatus.TITAN_SCHEMA_VIOLATION;
302 if (e instanceof IDPoolExhaustedException) {
303 return TitanOperationStatus.GENERAL_ERROR;
305 if (e instanceof InvalidElementException) {
306 return TitanOperationStatus.INVALID_ELEMENT;
308 if (e instanceof InvalidIDException) {
309 return TitanOperationStatus.INVALID_ID;
311 if (e instanceof QueryException) {
312 return TitanOperationStatus.INVALID_QUERY;
314 if (e instanceof ResourceUnavailableException) {
315 return TitanOperationStatus.RESOURCE_UNAVAILABLE;
317 if (e instanceof IllegalArgumentException) {
318 // TODO check the error message??
319 return TitanOperationStatus.ILLEGAL_ARGUMENT;
322 return TitanOperationStatus.GENERAL_ERROR;
325 public boolean getHealth() {
326 return this.lastHealthState;
329 private boolean isGraphOpen() {
330 healthLogger.trace("Invoking Titan health check ...");
334 Future<Vertex> future = healthCheckExecutor.submit(healthCallableTask);
335 v = future.get(this.healthCheckReadTimeout, TimeUnit.SECONDS);
336 healthLogger.trace("Health Check Node Found... {}", v.property(HEALTH_CHECK));
338 } catch (Exception e) {
339 String message = e.getMessage();
340 if (message == null) {
341 message = e.getClass().getName();
343 logger.error("Titan Health Check Failed. {}", message);
353 public static void main(String[] args) throws InterruptedException {
354 TitanGraphClient client = new TitanGraphClient(new DAOTitanStrategy());
355 client.createGraph();
358 boolean health = client.isGraphOpen();
359 System.err.println("health=" + health);
366 private static final String TITAN_HEALTH_CHECK_STR = "titanHealthCheck";
368 private void logAlarm() {
369 if (lastHealthState) {
370 BeEcompErrorManager.getInstance().logBeHealthCheckTitanRecovery(TITAN_HEALTH_CHECK_STR);
372 BeEcompErrorManager.getInstance().logBeHealthCheckTitanError(TITAN_HEALTH_CHECK_STR);
376 private void createTitanSchema() {
378 TitanManagement graphMgt = graph.openManagement();
379 TitanGraphIndex index = null;
380 for (GraphPropertiesDictionary prop : GraphPropertiesDictionary.values()) {
381 PropertyKey propKey = null;
382 if (!graphMgt.containsPropertyKey(prop.getProperty())) {
383 Class<?> clazz = prop.getClazz();
384 if (!clazz.isAssignableFrom(ArrayList.class) && !clazz.isAssignableFrom(HashMap.class)) {
385 propKey = graphMgt.makePropertyKey(prop.getProperty()).dataType(prop.getClazz()).make();
388 propKey = graphMgt.getPropertyKey(prop.getProperty());
390 if (prop.isIndexed()) {
391 if (!graphMgt.containsGraphIndex(prop.getProperty())) {
392 if (prop.isUnique()) {
393 index = graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).unique().buildCompositeIndex();
394 // Ensures only one name per vertex
395 graphMgt.setConsistency(propKey, ConsistencyModifier.LOCK);
396 // Ensures name uniqueness in the graph
397 graphMgt.setConsistency(index, ConsistencyModifier.LOCK);
400 graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).buildCompositeIndex();