summaryrefslogtreecommitdiff
path: root/source/com/c2kernel/persistency/TransactionManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'source/com/c2kernel/persistency/TransactionManager.java')
-rwxr-xr-xsource/com/c2kernel/persistency/TransactionManager.java334
1 files changed, 334 insertions, 0 deletions
diff --git a/source/com/c2kernel/persistency/TransactionManager.java b/source/com/c2kernel/persistency/TransactionManager.java
new file mode 100755
index 0000000..252c758
--- /dev/null
+++ b/source/com/c2kernel/persistency/TransactionManager.java
@@ -0,0 +1,334 @@
+package com.c2kernel.persistency;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import com.c2kernel.common.ObjectNotFoundException;
+import com.c2kernel.entity.C2KLocalObject;
+import com.c2kernel.entity.agent.JobList;
+import com.c2kernel.events.History;
+import com.c2kernel.lookup.InvalidEntityPathException;
+import com.c2kernel.utils.Logger;
+
+public class TransactionManager {
+
+ HashMap locks;
+ HashMap pendingTransactions;
+ ClusterStorageManager storage;
+
+ public TransactionManager() throws ClusterStorageException {
+ storage = new ClusterStorageManager();
+ locks = new HashMap();
+ pendingTransactions = new HashMap();
+ }
+
+ public boolean hasPendingTransactions()
+ {
+ return pendingTransactions.size() > 0;
+ }
+
+ public ClusterStorageManager getDb() {
+ return storage;
+ }
+
+ public void close() {
+ if (pendingTransactions.size() != 0) {
+ Logger.error("There were pending transactions on shutdown. All changes were lost.");
+ dumpPendingTransactions(0);
+ }
+ Logger.msg("Transaction Manager: Closing storages");
+ storage.close();
+ }
+
+ public String[] getClusterContents(int sysKey, String path) throws ClusterStorageException {
+ if (path.startsWith("/") && path.length() > 1) path = path.substring(1);
+ return storage.getClusterContents(new Integer(sysKey), path);
+ }
+
+ /**
+ * Public get method. Required a 'locker' object for a transaction key.
+ * Checks the transaction table first to see if the caller has uncommitted changes
+ */
+ public C2KLocalObject get(int sysKey, String path, Object locker)
+ throws ClusterStorageException,
+ ObjectNotFoundException {
+ if (path.startsWith("/") && path.length() > 1) path = path.substring(1);
+
+ // deal out top level remote maps
+ if (path.indexOf('/') == -1) {
+ try {
+ if (path.equals(ClusterStorage.HISTORY))
+ return new History(sysKey, locker);
+ if (path.equals(ClusterStorage.JOB))
+ return new JobList(sysKey, locker);
+ } catch (InvalidEntityPathException ex) {
+ throw new ObjectNotFoundException("Invalid key");
+ }
+ }
+
+ Integer sysKeyIntObj = new Integer(sysKey);
+ // check to see if the locker has been modifying this cluster
+ synchronized(locks) {
+ if (locks.containsKey(sysKeyIntObj) && locks.get(sysKeyIntObj).equals(locker)) {
+ ArrayList lockerTransaction = (ArrayList)pendingTransactions.get(locker);
+ for (Iterator i = lockerTransaction.iterator(); i.hasNext(); ) {
+ TransactionEntry thisEntry = (TransactionEntry)i.next();
+ if (sysKey == thisEntry.sysKey.intValue() && path.equals(thisEntry.getPath())) {
+ if (thisEntry.obj == null)
+ throw new ClusterStorageException("ClusterStorageManager.get() - Cluster " + path + " has been deleted in " + sysKey +
+ " but not yet committed");
+ return thisEntry.obj;
+ }
+ }
+ }
+ }
+ return storage.get(sysKeyIntObj, path);
+ }
+
+ /**
+ * Public put method. Manages the transaction table keyed by the object 'locker'.
+ * If this object is null, transaction support is bypassed (so long as no lock exists on that object).
+ */
+ public void put(int sysKey, C2KLocalObject obj, Object locker) throws ClusterStorageException {
+ Integer sysKeyIntObj = new Integer(sysKey);
+ ArrayList lockerTransaction;
+ String path = ClusterStorage.getPath(obj);
+
+ synchronized(locks) {
+ // look to see if this object is already locked
+ if (locks.containsKey(sysKeyIntObj)) {
+ // if it's this locker, get the transaction list
+ Object thisLocker = locks.get(sysKeyIntObj);
+ if (thisLocker.equals(locker)) // retrieve the transaction list
+ lockerTransaction = (ArrayList)pendingTransactions.get(locker);
+ else // locked by someone else
+ throw new ClusterStorageException("ClusterStorageManager.get() - Access denied: Object " + sysKeyIntObj +
+ " has been locked for writing by " + thisLocker);
+ }
+ else { // either we are the locker, or there is no locker
+ if (locker == null) { // non-locking put/delete
+ storage.put(sysKeyIntObj, obj);
+ return;
+ }
+ else {// initialise the transaction
+ locks.put(sysKeyIntObj, locker);
+ lockerTransaction = new ArrayList();
+ pendingTransactions.put(locker, lockerTransaction);
+ }
+ }
+
+ // create the new entry in the transaction table
+ TransactionEntry newEntry = new TransactionEntry(sysKeyIntObj, path, obj);
+ /* equals() in TransactionEntry only compares sysKey and path, so we can use
+ * contains() in ArrayList to looks for preexisting entries for this cluster
+ * and overwrite them.
+ */
+ if (lockerTransaction.contains(newEntry))
+ lockerTransaction.remove(newEntry);
+ lockerTransaction.add(newEntry);
+ }
+ }
+
+ /** Public delete method. Uses the put method, with null as the object value.
+ */
+ public void remove(int sysKey, String path, Object locker) throws ClusterStorageException {
+ Integer sysKeyIntObj = new Integer(sysKey);
+ ArrayList lockerTransaction;
+ synchronized(locks) {
+ // look to see if this object is already locked
+ if (locks.containsKey(sysKeyIntObj)) {
+ // if it's this locker, get the transaction list
+ Object thisLocker = locks.get(sysKeyIntObj);
+ if (thisLocker.equals(locker)) // retrieve the transaction list
+ lockerTransaction = (ArrayList)pendingTransactions.get(locker);
+ else // locked by someone else
+ throw new ClusterStorageException("ClusterStorageManager.get() - Access denied: Object " + sysKeyIntObj +
+ " has been locked for writing by " + thisLocker);
+ }
+ else { // either we are the locker, or there is no locker
+ if (locker == null) { // non-locking put/delete
+ storage.remove(sysKeyIntObj, path);
+ return;
+ }
+ else {// initialise the transaction
+ locks.put(sysKeyIntObj, locker);
+ lockerTransaction = new ArrayList();
+ pendingTransactions.put(locker, lockerTransaction);
+ }
+ }
+
+ // create the new entry in the transaction table
+ TransactionEntry newEntry = new TransactionEntry(sysKeyIntObj, path, null);
+ /* equals() in TransactionEntry only compares sysKey and path, so we can use
+ * contains() in ArrayList to looks for preexisting entries for this cluster
+ * and overwrite them.
+ */
+ if (lockerTransaction.contains(newEntry))
+ lockerTransaction.remove(newEntry);
+ lockerTransaction.add(newEntry);
+ }
+ }
+
+ /**
+ * Removes all child objects from the given path
+ *
+ * @param sysKey - entity to delete from
+ * @param path - root path to delete
+ * @param locker - locking object
+ *
+ * @throws ClusterStorageException - when deleting fails
+ */
+ public void removeCluster(int sysKey, String path, Object locker) throws ClusterStorageException {
+
+ String[] children = getClusterContents(sysKey, path);
+ for (int i = 0; i < children.length; i++)
+ removeCluster(sysKey, path+(path.length()>0?"/":"")+children[i], locker);
+ if (children.length==0 && path.indexOf("/") > -1)
+ remove(sysKey, path, locker);
+
+ }
+ /**
+ * Writes all pending changes to the backends.
+ */
+ public void commit(Object locker) {
+ synchronized(locks) {
+ ArrayList lockerTransactions = (ArrayList)pendingTransactions.get(locker);
+ HashMap exceptions = new HashMap();
+ // quit if no transactions are present;
+ if (lockerTransactions == null) return;
+ for (Iterator i = lockerTransactions.iterator();i.hasNext();) {
+ TransactionEntry thisEntry = (TransactionEntry)i.next();
+ try {
+ if (thisEntry.obj == null)
+ storage.remove(thisEntry.sysKey, thisEntry.path);
+ else
+ storage.put(thisEntry.sysKey, thisEntry.obj);
+ locks.remove(thisEntry.sysKey);
+ } catch (Exception e) {
+ exceptions.put(thisEntry, e);
+ }
+ }
+ pendingTransactions.remove(locker);
+ if (exceptions.size() > 0) { // oh dear
+ Logger.error("TransactionManager.commit() - Problems during transaction commit of locker "+locker.toString()+". Database may be in an inconsistent state.");
+ for (Iterator iter = exceptions.keySet().iterator(); iter.hasNext();) {
+ TransactionEntry entry = (TransactionEntry) iter.next();
+ Exception ex = (Exception)exceptions.get(entry);
+ Logger.msg(entry.toString());
+ Logger.error(ex);
+ }
+ dumpPendingTransactions(0);
+ Logger.die("Database failure");
+ }
+
+ }
+ }
+
+ /**
+ * Rolls back all changes sent in the name of 'locker' and unlocks the sysKeys
+ */
+ public void abort(Object locker) {
+ synchronized(locks) {
+ if (locks.containsValue(locker)) {
+ for (Iterator i=locks.keySet().iterator(); i.hasNext();) {
+ Integer thisKey = (Integer)i.next();
+ if (locks.get(thisKey).equals(locker))
+ locks.remove(thisKey);
+ }
+ }
+ pendingTransactions.remove(locker);
+ }
+ }
+
+ public void clearCache(int sysKey, String path) {
+ if (sysKey == -1)
+ storage.clearCache();
+ else if (path == null)
+ storage.clearCache(new Integer(sysKey));
+ else
+ storage.clearCache(new Integer(sysKey), path);
+
+ }
+
+ public void dumpPendingTransactions(int logLevel) {
+ Logger.msg(logLevel, "================");
+ Logger.msg(logLevel, "Transaction dump");
+ Logger.msg(logLevel, "Locked Items:");
+ if (locks.size() == 0)
+ Logger.msg(logLevel, " None");
+ else
+ for (Iterator iter = locks.keySet().iterator(); iter.hasNext();) {
+ Integer thisKey = (Integer)iter.next();
+ Object locker = locks.get(thisKey);
+ Logger.msg(logLevel, " "+thisKey+" locked by "+locker);
+ }
+
+ Logger.msg(logLevel, "Open transactions:");
+ if (pendingTransactions.size() == 0)
+ Logger.msg(logLevel, " None");
+ else
+ for (Iterator iter = pendingTransactions.keySet().iterator(); iter.hasNext();) {
+ Object thisLocker = iter.next();
+ Logger.msg(logLevel, " Transaction owner:"+thisLocker);
+ ArrayList entries = (ArrayList)pendingTransactions.get(thisLocker);
+ for (Iterator iterator = entries.iterator(); iterator.hasNext();) {
+ TransactionEntry thisEntry = (TransactionEntry) iterator.next();
+ Logger.msg(logLevel, " "+thisEntry.toString());
+ }
+ }
+ }
+
+ /** Used in the transaction table to store details of a put until commit
+ */
+ class TransactionEntry {
+ public Integer sysKey;
+ public String path;
+ public C2KLocalObject obj;
+ public TransactionEntry(Integer sysKey, String path, C2KLocalObject obj) {
+ this.sysKey = sysKey;
+ this.path = path;
+ this.obj = obj;
+ }
+
+ public String getPath() {
+ return ClusterStorage.getPath(obj);
+ }
+
+ public String toString() {
+ StringBuffer report = new StringBuffer();
+ if (obj == null)
+ report.append("Delete");
+ else
+ report.append("Put "+obj.getClass().getName());
+ report.append(" at ").append(path).append(" in ").append(sysKey);
+ return report.toString();
+
+ }
+ /**
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode() {
+ return sysKey.hashCode()*getPath().hashCode();
+ }
+
+ /**
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object other) {
+ if (other instanceof TransactionEntry)
+ return hashCode() == ((TransactionEntry)other).hashCode();
+ return false;
+ }
+
+ }
+
+ public Object query(String id, Object query) throws ClusterStorageException {
+ return storage.query(id, query);
+ }
+
+ public String queryToXML(String id, String query, boolean genericFormat) throws ClusterStorageException {
+ return storage.queryToXML(id, query, genericFormat);
+ }
+
+}