2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.dao.titan;
23 import com.thinkaurelius.titan.core.*;
24 import com.thinkaurelius.titan.core.schema.ConsistencyModifier;
25 import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
26 import com.thinkaurelius.titan.core.schema.TitanManagement;
27 import com.thinkaurelius.titan.core.util.TitanCleanup;
28 import com.thinkaurelius.titan.diskstorage.ResourceUnavailableException;
29 import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException;
30 import com.thinkaurelius.titan.graphdb.database.idassigner.IDPoolExhaustedException;
31 import fj.data.Either;
32 import org.apache.commons.configuration.BaseConfiguration;
33 import org.apache.tinkerpop.gremlin.structure.T;
34 import org.apache.tinkerpop.gremlin.structure.Vertex;
35 import org.openecomp.sdc.be.config.BeEcompErrorManager;
36 import org.openecomp.sdc.be.config.ConfigurationManager;
37 import org.openecomp.sdc.be.dao.DAOTitanStrategy;
38 import org.openecomp.sdc.be.dao.TitanClientStrategy;
39 import org.openecomp.sdc.be.dao.neo4j.GraphPropertiesDictionary;
40 import org.openecomp.sdc.common.log.wrappers.Logger;
41 import org.springframework.stereotype.Component;
43 import javax.annotation.PostConstruct;
44 import java.util.ArrayList;
45 import java.util.HashMap;
46 import java.util.concurrent.*;
49 @Component("titan-client")
50 public class TitanGraphClient {
52 private static Logger logger = Logger.getLogger(TitanGraphClient.class.getName());
53 private static Logger healthLogger = Logger.getLogger("titan.healthcheck");
55 private static final String HEALTH_CHECK = GraphPropertiesDictionary.HEALTH_CHECK.getProperty();
56 private static final String OK = "GOOD";
58 public TitanGraphClient() {
61 private class HealthCheckTask implements Callable<Vertex> {
63 public Vertex call() {
65 TitanVertex v = (TitanVertex) graph.query().has(HEALTH_CHECK, OK).vertices().iterator().next();
66 TitanVertexProperty<String> property = v.property("healthcheck", OK + "_" + System.currentTimeMillis());
67 healthLogger.trace("Health Check Node Found...{}", v.property(HEALTH_CHECK));
70 // Vertex v = graph.getVertices(HEALTH_CHECK, OK).iterator().next();
71 // v.setProperty("healthcheck", OK + "_" +
72 // System.currentTimeMillis());
74 // healthLogger.trace("Health Check Node
75 // Found..."+v.getProperty(HEALTH_CHECK) );
80 private class HealthCheckScheduledTask implements Runnable {
83 healthLogger.trace("Executing TITAN Health Check Task - Start");
84 boolean healthStatus = isGraphOpen();
85 healthLogger.trace("Executing TITAN Health Check Task - Status = {}", healthStatus);
86 if (healthStatus != lastHealthState) {
87 logger.trace("TITAN Health State Changed to {}. Issuing alarm / recovery alarm...", healthStatus);
88 lastHealthState = healthStatus;
94 private class ReconnectTask implements Runnable {
97 logger.trace("Trying to reconnect to Titan...");
99 createGraph(titanCfgFile);
104 private TitanGraph graph;
106 // Health Check Variables
109 * This executor will execute the health check task on a callable task that can be executed with a timeout.
111 ExecutorService healthCheckExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
113 public Thread newThread(Runnable r) {
114 return new Thread(r, "Titan-Health-Check-Thread");
117 private long healthCheckReadTimeout = 2;
118 HealthCheckTask healthCallableTask = new HealthCheckTask();
119 HealthCheckScheduledTask healthCheckScheduledTask = new HealthCheckScheduledTask();
120 boolean lastHealthState = false;
122 // Reconnection variables
123 private ScheduledExecutorService reconnectScheduler = null;
124 private ScheduledExecutorService healthCheckScheduler = null;
125 private Runnable reconnectTask = null;
126 private long reconnectInterval = 3;
127 @SuppressWarnings("rawtypes")
128 private Future reconnectFuture;
130 private String titanCfgFile = null;
131 TitanClientStrategy titanClientStrategy;
133 public TitanGraphClient(TitanClientStrategy titanClientStrategy) {
135 this.titanClientStrategy = titanClientStrategy;
137 // Initialize a single threaded scheduler for health-check
138 this.healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
140 public Thread newThread(Runnable r) {
141 return new Thread(r, "Titan-Health-Check-Task");
145 healthCheckReadTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getTitanHealthCheckReadTimeout(2);
146 reconnectInterval = ConfigurationManager.getConfigurationManager().getConfiguration().getTitanReconnectIntervalInSeconds(3);
148 logger.info("** TitanGraphClient created");
152 public TitanOperationStatus createGraph() {
154 logger.info("** createGraph started **");
156 if (ConfigurationManager.getConfigurationManager().getConfiguration().getTitanInMemoryGraph()) {
157 BaseConfiguration conf = new BaseConfiguration();
158 conf.setProperty("storage.backend", "inmemory");
159 graph = TitanFactory.open(conf);
161 logger.info("** in memory graph created");
162 return TitanOperationStatus.OK;
164 this.titanCfgFile = titanClientStrategy.getConfigFile();
165 if (titanCfgFile == null || titanCfgFile.isEmpty()) {
166 titanCfgFile = "config/titan.properties";
170 // In case connection failed on init time, schedule a reconnect task
172 TitanOperationStatus status = createGraph(titanCfgFile);
173 logger.debug("Create Titan graph status {}", status);
174 if (status != TitanOperationStatus.OK) {
175 this.startReconnectTask();
182 private void startHealthCheckTask() {
183 this.healthCheckScheduler.scheduleAtFixedRate(healthCheckScheduledTask, 0, reconnectInterval, TimeUnit.SECONDS);
187 * This method will be invoked ONLY on init time in case Titan storage is down.
189 private void startReconnectTask() {
190 this.reconnectTask = new ReconnectTask();
191 // Initialize a single threaded scheduler
192 this.reconnectScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
194 public Thread newThread(Runnable r) {
195 return new Thread(r, "Titan-Reconnect-Task");
199 logger.info("Scheduling reconnect task {} with interval of {} seconds", reconnectTask, reconnectInterval);
200 reconnectFuture = this.reconnectScheduler.scheduleAtFixedRate(this.reconnectTask, 0, this.reconnectInterval, TimeUnit.SECONDS);
203 public void cleanupGraph() {
207 TitanCleanup.clear(graph);
211 private boolean graphInitialized(){
212 TitanManagement graphMgmt = graph.openManagement();
213 return graphMgmt.containsPropertyKey(HEALTH_CHECK) && graphMgmt.containsGraphIndex(HEALTH_CHECK);
217 public TitanOperationStatus createGraph(String titanCfgFile) {
218 logger.info("** open graph with {} started", titanCfgFile);
220 logger.info("openGraph : try to load file {}", titanCfgFile);
221 graph = TitanFactory.open(titanCfgFile);
222 if (graph.isClosed() || !graphInitialized()) {
223 logger.error("titan graph was not initialized");
224 return TitanOperationStatus.NOT_CREATED;
227 } catch (Exception e) {
229 logger.info("createGraph : failed to open Titan graph with configuration file: {}", titanCfgFile);
230 logger.debug("createGraph : failed with exception.", e);
231 return TitanOperationStatus.NOT_CONNECTED;
234 logger.info("** Titan graph created ");
236 // Do some post creation actions
237 this.onGraphOpened();
239 return TitanOperationStatus.OK;
242 private void onGraphOpened() {
243 // if a reconnect task is running, cancel it.
244 if (this.reconnectFuture != null) {
245 logger.info("** Cancelling Titan reconnect task");
246 reconnectFuture.cancel(true);
249 // create health-check node
250 if (!graph.query().has(HEALTH_CHECK, OK).vertices().iterator().hasNext()) {
251 logger.trace("Healthcheck Singleton node does not exist, Creating healthcheck node...");
252 Vertex healthCheckNode = graph.addVertex();
253 healthCheckNode.property(HEALTH_CHECK, OK);
254 logger.trace("Healthcheck node created successfully. ID={}", healthCheckNode.property(T.id.getAccessor()));
257 logger.trace("Skipping Healthcheck Singleton node creation. Already exist...");
259 this.startHealthCheckTask();
263 public Either<TitanGraph, TitanOperationStatus> getGraph() {
265 return Either.left(graph);
267 return Either.right(TitanOperationStatus.NOT_CREATED);
271 public TitanOperationStatus commit() {
275 return TitanOperationStatus.OK;
276 } catch (Exception e) {
277 return handleTitanException(e);
280 return TitanOperationStatus.NOT_CREATED;
284 public TitanOperationStatus rollback() {
288 graph.tx().rollback();
289 return TitanOperationStatus.OK;
290 } catch (Exception e) {
291 return handleTitanException(e);
294 return TitanOperationStatus.NOT_CREATED;
298 public static TitanOperationStatus handleTitanException(Exception e) {
299 if (e instanceof TitanConfigurationException) {
300 return TitanOperationStatus.TITAN_CONFIGURATION;
302 if (e instanceof SchemaViolationException) {
303 return TitanOperationStatus.TITAN_SCHEMA_VIOLATION;
305 if (e instanceof PermanentLockingException) {
306 return TitanOperationStatus.TITAN_SCHEMA_VIOLATION;
308 if (e instanceof IDPoolExhaustedException) {
309 return TitanOperationStatus.GENERAL_ERROR;
311 if (e instanceof InvalidElementException) {
312 return TitanOperationStatus.INVALID_ELEMENT;
314 if (e instanceof InvalidIDException) {
315 return TitanOperationStatus.INVALID_ID;
317 if (e instanceof QueryException) {
318 return TitanOperationStatus.INVALID_QUERY;
320 if (e instanceof ResourceUnavailableException) {
321 return TitanOperationStatus.RESOURCE_UNAVAILABLE;
323 if (e instanceof IllegalArgumentException) {
324 // TODO check the error message??
325 return TitanOperationStatus.ILLEGAL_ARGUMENT;
328 return TitanOperationStatus.GENERAL_ERROR;
331 public boolean getHealth() {
332 return this.lastHealthState;
335 private boolean isGraphOpen() {
336 healthLogger.trace("Invoking Titan health check ...");
340 Future<Vertex> future = healthCheckExecutor.submit(healthCallableTask);
341 v = future.get(this.healthCheckReadTimeout, TimeUnit.SECONDS);
342 healthLogger.trace("Health Check Node Found... {}", v.property(HEALTH_CHECK));
344 } catch (Exception e) {
345 String message = e.getMessage();
346 if (message == null) {
347 message = e.getClass().getName();
349 logger.error("Titan Health Check Failed. {}", message);
359 public static void main(String[] args) throws InterruptedException {
360 TitanGraphClient client = new TitanGraphClient(new DAOTitanStrategy());
361 client.createGraph();
364 boolean health = client.isGraphOpen();
365 System.err.println("health=" + health);
372 private static final String TITAN_HEALTH_CHECK_STR = "titanHealthCheck";
374 private void logAlarm() {
375 if (lastHealthState) {
376 BeEcompErrorManager.getInstance().logBeHealthCheckTitanRecovery(TITAN_HEALTH_CHECK_STR);
378 BeEcompErrorManager.getInstance().logBeHealthCheckTitanError(TITAN_HEALTH_CHECK_STR);
382 private void createTitanSchema() {
384 TitanManagement graphMgt = graph.openManagement();
385 TitanGraphIndex index = null;
386 for (GraphPropertiesDictionary prop : GraphPropertiesDictionary.values()) {
387 PropertyKey propKey = null;
388 if (!graphMgt.containsPropertyKey(prop.getProperty())) {
389 Class<?> clazz = prop.getClazz();
390 if (!ArrayList.class.getName().equals(clazz.getName()) && !HashMap.class.getName().equals(clazz.getName())) {
391 propKey = graphMgt.makePropertyKey(prop.getProperty()).dataType(prop.getClazz()).make();
394 propKey = graphMgt.getPropertyKey(prop.getProperty());
396 if (prop.isIndexed()) {
397 if (!graphMgt.containsGraphIndex(prop.getProperty())) {
398 if (prop.isUnique()) {
399 index = graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).unique().buildCompositeIndex();
401 graphMgt.setConsistency(propKey, ConsistencyModifier.LOCK); // Ensures
407 graphMgt.setConsistency(index, ConsistencyModifier.LOCK); // Ensures
415 graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).buildCompositeIndex();