/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2021-2022 Nordix Foundation
+ * Copyright (C) 2022 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.cps.spi.utils;
-import java.util.HashMap;
-import java.util.Map;
+import com.google.common.util.concurrent.TimeLimiter;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import jakarta.annotation.PostConstruct;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
import org.hibernate.HibernateException;
+import org.hibernate.LockMode;
import org.hibernate.Session;
-import org.hibernate.SessionException;
-import org.hibernate.SessionFactory;
-import org.hibernate.cfg.Configuration;
+import org.onap.cps.spi.config.CpsSessionFactory;
import org.onap.cps.spi.entities.AnchorEntity;
import org.onap.cps.spi.entities.DataspaceEntity;
-import org.onap.cps.spi.entities.SchemaSetEntity;
-import org.onap.cps.spi.entities.YangResourceEntity;
+import org.onap.cps.spi.exceptions.SessionManagerException;
+import org.onap.cps.spi.exceptions.SessionTimeoutException;
+import org.onap.cps.spi.repository.AnchorRepository;
+import org.onap.cps.spi.repository.DataspaceRepository;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
+@RequiredArgsConstructor
+@Slf4j
@Component
+@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public class SessionManager {
- private static SessionFactory sessionFactory;
- private static Map<String, Session> sessionMap = new HashMap<>();
+ private final CpsSessionFactory cpsSessionFactory;
+ private final TimeLimiterProvider timeLimiterProvider;
+ private final DataspaceRepository dataspaceRepository;
+ private final AnchorRepository anchorRepository;
+ private final ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<>();
+ public static final boolean WITH_COMMIT = true;
+ public static final boolean WITH_ROLLBACK = false;
- private synchronized void buildSessionFactory() {
- if (sessionFactory == null) {
- sessionFactory = new Configuration().configure("hibernate.cfg.xml")
- .addAnnotatedClass(AnchorEntity.class)
- .addAnnotatedClass(DataspaceEntity.class)
- .addAnnotatedClass(SchemaSetEntity.class)
- .addAnnotatedClass(YangResourceEntity.class)
- .buildSessionFactory();
+ @PostConstruct
+ private void postConstruct() {
+ final Thread shutdownHook = new Thread(this::closeAllSessionsInShutdown);
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+ }
+
+ private void closeAllSessionsInShutdown() {
+ for (final String sessionId : sessionMap.keySet()) {
+ try {
+ closeSession(sessionId, WITH_ROLLBACK);
+ log.info("Session with session ID {} rolled back and closed", sessionId);
+ } catch (final Exception e) {
+ log.warn("Session with session ID {} failed to close", sessionId);
+ }
}
+ cpsSessionFactory.closeSessionFactory();
}
/**
* @return Session ID string
*/
public String startSession() {
- buildSessionFactory();
- final Session session = sessionFactory.openSession();
+ final Session session = cpsSessionFactory.openSession();
final String sessionId = UUID.randomUUID().toString();
sessionMap.put(sessionId, session);
session.beginTransaction();
/**
* Close session.
+ * Changes are committed when commit boolean is set to true.
+ * Rollback will execute when commit boolean is set to false.
*
* @param sessionId session ID
+ * @param commit indicator whether session will commit or rollback
*/
- public void closeSession(final String sessionId) {
+ public void closeSession(final String sessionId, final boolean commit) {
try {
- final Session currentSession = sessionMap.get(sessionId);
- currentSession.getTransaction().commit();
- currentSession.close();
- } catch (final NullPointerException e) {
- throw new SessionException(String.format("Session with session ID %s does not exist", sessionId));
+ final Session session = getSession(sessionId);
+ if (commit) {
+ session.getTransaction().commit();
+ } else {
+ session.getTransaction().rollback();
+ }
+ session.close();
} catch (final HibernateException e) {
- throw new SessionException(String.format("Unable to close session with session ID %s", sessionId));
+ throw new SessionManagerException("Cannot close session",
+ String.format("Unable to close session with session ID '%s'", sessionId), e);
+ } finally {
+ sessionMap.remove(sessionId);
+ }
+ }
+
+ /**
+ * Lock Anchor.
+ * To release locks(s), the session holding the lock(s) must be closed.
+ *
+ * @param sessionId session ID
+ * @param dataspaceName dataspace name
+ * @param anchorName anchor name
+ * @param timeoutInMilliseconds lock attempt timeout in milliseconds
+ */
+ @SneakyThrows
+ public void lockAnchor(final String sessionId, final String dataspaceName,
+ final String anchorName, final Long timeoutInMilliseconds) {
+ final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ final TimeLimiter timeLimiter = timeLimiterProvider.getTimeLimiter(executorService);
+
+ try {
+ timeLimiter.callWithTimeout(() -> {
+ applyPessimisticWriteLockOnAnchor(sessionId, dataspaceName, anchorName);
+ return null;
+ }, timeoutInMilliseconds, TimeUnit.MILLISECONDS);
+ } catch (final TimeoutException e) {
+ throw new SessionTimeoutException(
+ "Timeout: Anchor locking failed",
+ "The error could be caused by another session holding a lock on the specified table. "
+ + "Retrying the sending the request could be required.", e);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SessionManagerException("Operation interrupted", "This thread was interrupted.", e);
+ } catch (final ExecutionException | UncheckedExecutionException e) {
+ if (e.getCause() != null) {
+ throw e.getCause();
+ }
+ throw new SessionManagerException(
+ "Operation Aborted",
+ "The transaction request was aborted. "
+ + "Retrying and checking all details are correct could be required", e);
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
+ private void applyPessimisticWriteLockOnAnchor(final String sessionId, final String dataspaceName,
+ final String anchorName) {
+ final Session session = getSession(sessionId);
+ final DataspaceEntity dataspaceEntity = dataspaceRepository.getByName(dataspaceName);
+ final AnchorEntity anchorEntity = anchorRepository.getByDataspaceAndName(dataspaceEntity, anchorName);
+ final long anchorId = anchorEntity.getId();
+ log.debug("Attempting to lock anchor {} for session {}", anchorName, sessionId);
+ session.get(AnchorEntity.class, anchorId, LockMode.PESSIMISTIC_WRITE);
+ log.info("Anchor {} successfully locked", anchorName);
+ }
+
+ private Session getSession(final String sessionId) {
+ final Session session = sessionMap.get(sessionId);
+ if (session == null) {
+ throw new SessionManagerException("Session not found",
+ String.format("Session with ID %s does not exist", sessionId));
}
- sessionMap.remove(sessionId);
+ return session;
}
-}
\ No newline at end of file
+}