From 0ed2c1124cf1b9e49a2ec1fa0126a8df09f9e758 Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Tue, 7 Oct 2014 09:18:11 +0200 Subject: Repackage to org.cristalise --- .../kernel/persistency/TransactionManager.java | 353 +++++++++++++++++++++ 1 file changed, 353 insertions(+) create mode 100644 src/main/java/org/cristalise/kernel/persistency/TransactionManager.java (limited to 'src/main/java/org/cristalise/kernel/persistency/TransactionManager.java') diff --git a/src/main/java/org/cristalise/kernel/persistency/TransactionManager.java b/src/main/java/org/cristalise/kernel/persistency/TransactionManager.java new file mode 100644 index 0000000..7712da6 --- /dev/null +++ b/src/main/java/org/cristalise/kernel/persistency/TransactionManager.java @@ -0,0 +1,353 @@ +/** + * This file is part of the CRISTAL-iSE kernel. + * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved. + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; either version 3 of the License, or (at + * your option) any later version. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * + * http://www.fsf.org/licensing/licenses/lgpl.html + */ +package org.cristalise.kernel.persistency; + +import java.util.ArrayList; +import java.util.HashMap; + +import org.cristalise.kernel.common.ObjectNotFoundException; +import org.cristalise.kernel.common.PersistencyException; +import org.cristalise.kernel.entity.C2KLocalObject; +import org.cristalise.kernel.entity.agent.JobList; +import org.cristalise.kernel.events.History; +import org.cristalise.kernel.lookup.ItemPath; +import org.cristalise.kernel.process.auth.Authenticator; +import org.cristalise.kernel.utils.Logger; + + +public class TransactionManager { + + HashMap locks; + HashMap> pendingTransactions; + ClusterStorageManager storage; + + public TransactionManager(Authenticator auth) throws PersistencyException { + storage = new ClusterStorageManager(auth); + locks = new HashMap(); + pendingTransactions = new HashMap>(); + } + + public boolean hasPendingTransactions() + { + return pendingTransactions.size() > 0; + } + + public ClusterStorageManager getDb() { + return storage; + } + + public void close() { + if (pendingTransactions.size() != 0) { + Logger.error("There were pending transactions on shutdown. All changes were lost."); + dumpPendingTransactions(0); + } + Logger.msg("Transaction Manager: Closing storages"); + storage.close(); + } + + public String[] getClusterContents(ItemPath itemPath, String path) throws PersistencyException { + if (path.startsWith("/") && path.length() > 1) path = path.substring(1); + return storage.getClusterContents(itemPath, path); + } + + /** + * Public get method. Required a 'locker' object for a transaction key. + * Checks the transaction table first to see if the caller has uncommitted changes + */ + public C2KLocalObject get(ItemPath itemPath, String path, Object locker) + throws PersistencyException, + ObjectNotFoundException { + if (path.startsWith("/") && path.length() > 1) path = path.substring(1); + + // deal out top level remote maps, if transactions aren't needed + if (path.indexOf('/') == -1) { + if (path.equals(ClusterStorage.HISTORY) && locker != null) + return new History(itemPath, locker); + if (path.equals(ClusterStorage.JOB) && locker != null) + return new JobList(itemPath, locker); + } + + // check to see if the locker has been modifying this cluster + if (locks.containsKey(itemPath) && locks.get(itemPath).equals(locker)) { + ArrayList lockerTransaction = pendingTransactions.get(locker); + for (TransactionEntry thisEntry : lockerTransaction) { + if (itemPath.equals(thisEntry.itemPath) && path.equals(thisEntry.path)) { + if (thisEntry.obj == null) + throw new PersistencyException("ClusterStorageManager.get() - Cluster " + path + " has been deleted in " + itemPath + + " but not yet committed"); + return thisEntry.obj; + } + } + } + return storage.get(itemPath, path); + } + + /** + * Public put method. Manages the transaction table keyed by the object 'locker'. + * If this object is null, transaction support is bypassed (so long as no lock exists on that object). + */ + public void put(ItemPath itemPath, C2KLocalObject obj, Object locker) throws PersistencyException { + Object tempLocker = null; + ArrayList lockerTransaction; + + synchronized(locks) { + // look to see if this object is already locked + if (locks.containsKey(itemPath)) { + // if it's this locker, get the transaction list + Object thisLocker = locks.get(itemPath); + if (thisLocker.equals(locker)) // retrieve the transaction list + lockerTransaction = pendingTransactions.get(locker); + else // locked by someone else + throw new PersistencyException("ClusterStorageManager.get() - Access denied: Object " + itemPath + + " has been locked for writing by " + thisLocker); + } + else { // no locks for this item + if (locker == null) { // lock the item until the non-transactional put is complete :/ + tempLocker = new Object(); + locks.put(itemPath, tempLocker); + lockerTransaction = null; + } + else { // initialise the transaction + locks.put(itemPath, locker); + lockerTransaction = new ArrayList(); + pendingTransactions.put(locker, lockerTransaction); + } + } + } + + if (tempLocker != null) { // non-locking put/delete + storage.put(itemPath, obj); + locks.remove(itemPath); + return; + } + + // create the new entry in the transaction table + TransactionEntry newEntry = new TransactionEntry(itemPath, obj); + /* equals() in TransactionEntry only compares sysKey and path, so we can use + * contains() in ArrayList to looks for preexisting entries for this cluster + * and overwrite them. + */ + if (lockerTransaction.contains(newEntry)) + lockerTransaction.remove(newEntry); + lockerTransaction.add(newEntry); + } + + /** Public delete method. Uses the put method, with null as the object value. + */ + public void remove(ItemPath itemPath, String path, Object locker) throws PersistencyException { + ArrayList lockerTransaction; + Object tempLocker = null; + synchronized(locks) { + // look to see if this object is already locked + if (locks.containsKey(itemPath)) { + // if it's this locker, get the transaction list + Object thisLocker = locks.get(itemPath); + if (thisLocker.equals(locker)) // retrieve the transaction list + lockerTransaction = pendingTransactions.get(locker); + else // locked by someone else + throw new PersistencyException("ClusterStorageManager.get() - Access denied: Object " + itemPath + + " has been locked for writing by " + thisLocker); + } + else { // either we are the locker, or there is no locker + if (locker == null) { // non-locking put/delete + tempLocker = new Object(); + locks.put(itemPath, tempLocker); + lockerTransaction = null; + } + else {// initialise the transaction + locks.put(itemPath, locker); + lockerTransaction = new ArrayList(); + pendingTransactions.put(locker, lockerTransaction); + } + } + } + + if (tempLocker != null) { + storage.remove(itemPath, path); + locks.remove(itemPath); + return; + } + + // create the new entry in the transaction table + TransactionEntry newEntry = new TransactionEntry(itemPath, path); + /* equals() in TransactionEntry only compares sysKey and path, so we can use + * contains() in ArrayList to looks for preexisting entries for this cluster + * and overwrite them. + */ + if (lockerTransaction.contains(newEntry)) + lockerTransaction.remove(newEntry); + lockerTransaction.add(newEntry); + + } + + /** + * Removes all child objects from the given path + * + * @param sysKey - entity to delete from + * @param path - root path to delete + * @param locker - locking object + * + * @throws PersistencyException - when deleting fails + */ + public void removeCluster(ItemPath itemPath, String path, Object locker) throws PersistencyException { + + String[] children = getClusterContents(itemPath, path); + for (String element : children) + removeCluster(itemPath, path+(path.length()>0?"/":"")+element, locker); + if (children.length==0 && path.indexOf("/") > -1) + remove(itemPath, path, locker); + + } + /** + * Writes all pending changes to the backends. + */ + public void commit(Object locker) { + synchronized(locks) { + ArrayList lockerTransactions = pendingTransactions.get(locker); + HashMap exceptions = new HashMap(); + // quit if no transactions are present; + if (lockerTransactions == null) return; + for (TransactionEntry thisEntry : lockerTransactions) { + try { + if (thisEntry.obj == null) + storage.remove(thisEntry.itemPath, thisEntry.path); + else + storage.put(thisEntry.itemPath, thisEntry.obj); + locks.remove(thisEntry.itemPath); + } catch (Exception e) { + exceptions.put(thisEntry, e); + } + } + pendingTransactions.remove(locker); + if (exceptions.size() > 0) { // oh dear + Logger.error("TransactionManager.commit() - Problems during transaction commit of locker "+locker.toString()+". Database may be in an inconsistent state."); + for (TransactionEntry entry : exceptions.keySet()) { + Exception ex = exceptions.get(entry); + Logger.msg(entry.toString()); + Logger.error(ex); + } + dumpPendingTransactions(0); + Logger.die("Database failure"); + } + + } + } + + /** + * Rolls back all changes sent in the name of 'locker' and unlocks the sysKeys + */ + public void abort(Object locker) { + synchronized(locks) { + if (locks.containsValue(locker)) { + for (ItemPath thisPath : locks.keySet()) { + if (locks.get(thisPath).equals(locker)) + locks.remove(thisPath); + } + } + pendingTransactions.remove(locker); + } + } + + public void clearCache(ItemPath itemPath, String path) { + if (itemPath == null) + storage.clearCache(); + else if (path == null) + storage.clearCache(itemPath); + else + storage.clearCache(itemPath, path); + + } + + public void dumpPendingTransactions(int logLevel) { + Logger.msg(logLevel, "================"); + Logger.msg(logLevel, "Transaction dump"); + Logger.msg(logLevel, "Locked Items:"); + if (locks.size() == 0) + Logger.msg(logLevel, " None"); + else + for (ItemPath thisPath : locks.keySet()) { + Object locker = locks.get(thisPath); + Logger.msg(logLevel, " "+thisPath+" locked by "+locker); + } + + Logger.msg(logLevel, "Open transactions:"); + if (pendingTransactions.size() == 0) + Logger.msg(logLevel, " None"); + else + for (Object thisLocker : pendingTransactions.keySet()) { + Logger.msg(logLevel, " Transaction owner:"+thisLocker); + ArrayList entries = pendingTransactions.get(thisLocker); + for (TransactionEntry thisEntry : entries) { + Logger.msg(logLevel, " "+thisEntry.toString()); + } + } + } + + /** Used in the transaction table to store details of a put until commit + */ + static class TransactionEntry { + public ItemPath itemPath; + public String path; + public C2KLocalObject obj; + public TransactionEntry(ItemPath itemPath, C2KLocalObject obj) { + this.itemPath = itemPath; + this.path = ClusterStorage.getPath(obj); + this.obj = obj; + } + + public TransactionEntry(ItemPath itemPath, String path) { + this.itemPath = itemPath; + this.path = path; + this.obj = null; + } + + @Override + public String toString() { + StringBuffer report = new StringBuffer(); + if (obj == null) + report.append("Delete"); + else + report.append("Put "+obj.getClass().getName()); + report.append(" at ").append(path).append(" in ").append(itemPath); + return report.toString(); + + } + /** + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + return itemPath.hashCode()*path.hashCode(); + } + + /** + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object other) { + if (other instanceof TransactionEntry) + return hashCode() == ((TransactionEntry)other).hashCode(); + return false; + } + + } + +} -- cgit v1.2.3