From b086f57f56bf0eb9dab9cf321a0f69aaaae84347 Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Wed, 30 May 2012 08:37:45 +0200 Subject: Initial Maven Conversion --- .../c2kernel/persistency/TransactionManager.java | 324 +++++++++++++++++++++ 1 file changed, 324 insertions(+) create mode 100644 src/main/java/com/c2kernel/persistency/TransactionManager.java (limited to 'src/main/java/com/c2kernel/persistency/TransactionManager.java') diff --git a/src/main/java/com/c2kernel/persistency/TransactionManager.java b/src/main/java/com/c2kernel/persistency/TransactionManager.java new file mode 100644 index 0000000..d2679a8 --- /dev/null +++ b/src/main/java/com/c2kernel/persistency/TransactionManager.java @@ -0,0 +1,324 @@ +package com.c2kernel.persistency; + +import java.util.ArrayList; +import java.util.HashMap; + +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.entity.C2KLocalObject; +import com.c2kernel.entity.agent.JobList; +import com.c2kernel.events.History; +import com.c2kernel.utils.Logger; + +public class TransactionManager { + + HashMap locks; + HashMap> pendingTransactions; + ClusterStorageManager storage; + + public TransactionManager() throws ClusterStorageException { + storage = new ClusterStorageManager(); + locks = new HashMap(); + pendingTransactions = new HashMap>(); + } + + public boolean hasPendingTransactions() + { + return pendingTransactions.size() > 0; + } + + public ClusterStorageManager getDb() { + return storage; + } + + public void close() { + if (pendingTransactions.size() != 0) { + Logger.error("There were pending transactions on shutdown. All changes were lost."); + dumpPendingTransactions(0); + } + Logger.msg("Transaction Manager: Closing storages"); + storage.close(); + } + + public String[] getClusterContents(int sysKey, String path) throws ClusterStorageException { + if (path.startsWith("/") && path.length() > 1) path = path.substring(1); + return storage.getClusterContents(new Integer(sysKey), path); + } + + /** + * Public get method. Required a 'locker' object for a transaction key. + * Checks the transaction table first to see if the caller has uncommitted changes + */ + public C2KLocalObject get(int sysKey, String path, Object locker) + throws ClusterStorageException, + ObjectNotFoundException { + if (path.startsWith("/") && path.length() > 1) path = path.substring(1); + + // deal out top level remote maps, if transactions aren't needed + if (path.indexOf('/') == -1) { + if (path.equals(ClusterStorage.HISTORY) && locker != null) + return new History(sysKey, locker); + if (path.equals(ClusterStorage.JOB) && locker != null) + return new JobList(sysKey, locker); + } + + Integer sysKeyIntObj = new Integer(sysKey); + // check to see if the locker has been modifying this cluster + synchronized(locks) { + if (locks.containsKey(sysKeyIntObj) && locks.get(sysKeyIntObj).equals(locker)) { + ArrayList lockerTransaction = pendingTransactions.get(locker); + for (TransactionEntry thisEntry : lockerTransaction) { + if (sysKey == thisEntry.sysKey.intValue() && path.equals(thisEntry.getPath())) { + if (thisEntry.obj == null) + throw new ClusterStorageException("ClusterStorageManager.get() - Cluster " + path + " has been deleted in " + sysKey + + " but not yet committed"); + return thisEntry.obj; + } + } + } + } + return storage.get(sysKeyIntObj, path); + } + + /** + * Public put method. Manages the transaction table keyed by the object 'locker'. + * If this object is null, transaction support is bypassed (so long as no lock exists on that object). + */ + public void put(int sysKey, C2KLocalObject obj, Object locker) throws ClusterStorageException { + Integer sysKeyIntObj = new Integer(sysKey); + ArrayList lockerTransaction; + String path = ClusterStorage.getPath(obj); + + synchronized(locks) { + // look to see if this object is already locked + if (locks.containsKey(sysKeyIntObj)) { + // if it's this locker, get the transaction list + Object thisLocker = locks.get(sysKeyIntObj); + if (thisLocker.equals(locker)) // retrieve the transaction list + lockerTransaction = pendingTransactions.get(locker); + else // locked by someone else + throw new ClusterStorageException("ClusterStorageManager.get() - Access denied: Object " + sysKeyIntObj + + " has been locked for writing by " + thisLocker); + } + else { // either we are the locker, or there is no locker + if (locker == null) { // non-locking put/delete + storage.put(sysKeyIntObj, obj); + return; + } + else {// initialise the transaction + locks.put(sysKeyIntObj, locker); + lockerTransaction = new ArrayList(); + pendingTransactions.put(locker, lockerTransaction); + } + } + + // create the new entry in the transaction table + TransactionEntry newEntry = new TransactionEntry(sysKeyIntObj, path, obj); + /* equals() in TransactionEntry only compares sysKey and path, so we can use + * contains() in ArrayList to looks for preexisting entries for this cluster + * and overwrite them. + */ + if (lockerTransaction.contains(newEntry)) + lockerTransaction.remove(newEntry); + lockerTransaction.add(newEntry); + } + } + + /** Public delete method. Uses the put method, with null as the object value. + */ + public void remove(int sysKey, String path, Object locker) throws ClusterStorageException { + Integer sysKeyIntObj = new Integer(sysKey); + ArrayList lockerTransaction; + synchronized(locks) { + // look to see if this object is already locked + if (locks.containsKey(sysKeyIntObj)) { + // if it's this locker, get the transaction list + Object thisLocker = locks.get(sysKeyIntObj); + if (thisLocker.equals(locker)) // retrieve the transaction list + lockerTransaction = pendingTransactions.get(locker); + else // locked by someone else + throw new ClusterStorageException("ClusterStorageManager.get() - Access denied: Object " + sysKeyIntObj + + " has been locked for writing by " + thisLocker); + } + else { // either we are the locker, or there is no locker + if (locker == null) { // non-locking put/delete + storage.remove(sysKeyIntObj, path); + return; + } + else {// initialise the transaction + locks.put(sysKeyIntObj, locker); + lockerTransaction = new ArrayList(); + pendingTransactions.put(locker, lockerTransaction); + } + } + + // create the new entry in the transaction table + TransactionEntry newEntry = new TransactionEntry(sysKeyIntObj, path, null); + /* equals() in TransactionEntry only compares sysKey and path, so we can use + * contains() in ArrayList to looks for preexisting entries for this cluster + * and overwrite them. + */ + if (lockerTransaction.contains(newEntry)) + lockerTransaction.remove(newEntry); + lockerTransaction.add(newEntry); + } + } + + /** + * Removes all child objects from the given path + * + * @param sysKey - entity to delete from + * @param path - root path to delete + * @param locker - locking object + * + * @throws ClusterStorageException - when deleting fails + */ + public void removeCluster(int sysKey, String path, Object locker) throws ClusterStorageException { + + String[] children = getClusterContents(sysKey, path); + for (String element : children) + removeCluster(sysKey, path+(path.length()>0?"/":"")+element, locker); + if (children.length==0 && path.indexOf("/") > -1) + remove(sysKey, path, locker); + + } + /** + * Writes all pending changes to the backends. + */ + public void commit(Object locker) { + synchronized(locks) { + ArrayList lockerTransactions = pendingTransactions.get(locker); + HashMap exceptions = new HashMap(); + // quit if no transactions are present; + if (lockerTransactions == null) return; + for (TransactionEntry thisEntry : lockerTransactions) { + try { + if (thisEntry.obj == null) + storage.remove(thisEntry.sysKey, thisEntry.path); + else + storage.put(thisEntry.sysKey, thisEntry.obj); + locks.remove(thisEntry.sysKey); + } catch (Exception e) { + exceptions.put(thisEntry, e); + } + } + pendingTransactions.remove(locker); + if (exceptions.size() > 0) { // oh dear + Logger.error("TransactionManager.commit() - Problems during transaction commit of locker "+locker.toString()+". Database may be in an inconsistent state."); + for (TransactionEntry entry : exceptions.keySet()) { + Exception ex = exceptions.get(entry); + Logger.msg(entry.toString()); + Logger.error(ex); + } + dumpPendingTransactions(0); + Logger.die("Database failure"); + } + + } + } + + /** + * Rolls back all changes sent in the name of 'locker' and unlocks the sysKeys + */ + public void abort(Object locker) { + synchronized(locks) { + if (locks.containsValue(locker)) { + for (Integer thisKey : locks.keySet()) { + if (locks.get(thisKey).equals(locker)) + locks.remove(thisKey); + } + } + pendingTransactions.remove(locker); + } + } + + public void clearCache(int sysKey, String path) { + if (sysKey == -1) + storage.clearCache(); + else if (path == null) + storage.clearCache(new Integer(sysKey)); + else + storage.clearCache(new Integer(sysKey), path); + + } + + public void dumpPendingTransactions(int logLevel) { + Logger.msg(logLevel, "================"); + Logger.msg(logLevel, "Transaction dump"); + Logger.msg(logLevel, "Locked Items:"); + if (locks.size() == 0) + Logger.msg(logLevel, " None"); + else + for (Integer thisKey : locks.keySet()) { + Object locker = locks.get(thisKey); + Logger.msg(logLevel, " "+thisKey+" locked by "+locker); + } + + Logger.msg(logLevel, "Open transactions:"); + if (pendingTransactions.size() == 0) + Logger.msg(logLevel, " None"); + else + for (Object thisLocker : pendingTransactions.keySet()) { + Logger.msg(logLevel, " Transaction owner:"+thisLocker); + ArrayList entries = pendingTransactions.get(thisLocker); + for (TransactionEntry thisEntry : entries) { + Logger.msg(logLevel, " "+thisEntry.toString()); + } + } + } + + /** Used in the transaction table to store details of a put until commit + */ + class TransactionEntry { + public Integer sysKey; + public String path; + public C2KLocalObject obj; + public TransactionEntry(Integer sysKey, String path, C2KLocalObject obj) { + this.sysKey = sysKey; + this.path = path; + this.obj = obj; + } + + public String getPath() { + return ClusterStorage.getPath(obj); + } + + @Override + public String toString() { + StringBuffer report = new StringBuffer(); + if (obj == null) + report.append("Delete"); + else + report.append("Put "+obj.getClass().getName()); + report.append(" at ").append(path).append(" in ").append(sysKey); + return report.toString(); + + } + /** + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + return sysKey.hashCode()*getPath().hashCode(); + } + + /** + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object other) { + if (other instanceof TransactionEntry) + return hashCode() == ((TransactionEntry)other).hashCode(); + return false; + } + + } + + public Object query(String id, Object query) throws ClusterStorageException { + return storage.query(id, query); + } + + public String queryToXML(String id, String query, boolean genericFormat) throws ClusterStorageException { + return storage.queryToXML(id, query, genericFormat); + } + +} -- cgit v1.2.3