package com.c2kernel.persistency; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import com.c2kernel.common.ObjectNotFoundException; import com.c2kernel.entity.C2KLocalObject; import com.c2kernel.entity.agent.JobList; import com.c2kernel.events.History; import com.c2kernel.lookup.InvalidEntityPathException; import com.c2kernel.utils.Logger; public class TransactionManager { HashMap locks; HashMap> pendingTransactions; ClusterStorageManager storage; public TransactionManager() throws ClusterStorageException { storage = new ClusterStorageManager(); locks = new HashMap(); pendingTransactions = new HashMap>(); } public boolean hasPendingTransactions() { return pendingTransactions.size() > 0; } public ClusterStorageManager getDb() { return storage; } public void close() { if (pendingTransactions.size() != 0) { Logger.error("There were pending transactions on shutdown. All changes were lost."); dumpPendingTransactions(0); } Logger.msg("Transaction Manager: Closing storages"); storage.close(); } public String[] getClusterContents(int sysKey, String path) throws ClusterStorageException { if (path.startsWith("/") && path.length() > 1) path = path.substring(1); return storage.getClusterContents(new Integer(sysKey), path); } /** * Public get method. Required a 'locker' object for a transaction key. * Checks the transaction table first to see if the caller has uncommitted changes */ public C2KLocalObject get(int sysKey, String path, Object locker) throws ClusterStorageException, ObjectNotFoundException { if (path.startsWith("/") && path.length() > 1) path = path.substring(1); // deal out top level remote maps if (path.indexOf('/') == -1) { try { if (path.equals(ClusterStorage.HISTORY)) return new History(sysKey, locker); if (path.equals(ClusterStorage.JOB)) return new JobList(sysKey, locker); } catch (InvalidEntityPathException ex) { throw new ObjectNotFoundException("Invalid key"); } } Integer sysKeyIntObj = new Integer(sysKey); // check to see if the locker has been modifying this cluster synchronized(locks) { if (locks.containsKey(sysKeyIntObj) && locks.get(sysKeyIntObj).equals(locker)) { ArrayList lockerTransaction = (ArrayList)pendingTransactions.get(locker); for (Iterator i = lockerTransaction.iterator(); i.hasNext(); ) { TransactionEntry thisEntry = (TransactionEntry)i.next(); if (sysKey == thisEntry.sysKey.intValue() && path.equals(thisEntry.getPath())) { if (thisEntry.obj == null) throw new ClusterStorageException("ClusterStorageManager.get() - Cluster " + path + " has been deleted in " + sysKey + " but not yet committed"); return thisEntry.obj; } } } } return storage.get(sysKeyIntObj, path); } /** * Public put method. Manages the transaction table keyed by the object 'locker'. * If this object is null, transaction support is bypassed (so long as no lock exists on that object). */ public void put(int sysKey, C2KLocalObject obj, Object locker) throws ClusterStorageException { Integer sysKeyIntObj = new Integer(sysKey); ArrayList lockerTransaction; String path = ClusterStorage.getPath(obj); synchronized(locks) { // look to see if this object is already locked if (locks.containsKey(sysKeyIntObj)) { // if it's this locker, get the transaction list Object thisLocker = locks.get(sysKeyIntObj); if (thisLocker.equals(locker)) // retrieve the transaction list lockerTransaction = pendingTransactions.get(locker); else // locked by someone else throw new ClusterStorageException("ClusterStorageManager.get() - Access denied: Object " + sysKeyIntObj + " has been locked for writing by " + thisLocker); } else { // either we are the locker, or there is no locker if (locker == null) { // non-locking put/delete storage.put(sysKeyIntObj, obj); return; } else {// initialise the transaction locks.put(sysKeyIntObj, locker); lockerTransaction = new ArrayList(); pendingTransactions.put(locker, lockerTransaction); } } // create the new entry in the transaction table TransactionEntry newEntry = new TransactionEntry(sysKeyIntObj, path, obj); /* equals() in TransactionEntry only compares sysKey and path, so we can use * contains() in ArrayList to looks for preexisting entries for this cluster * and overwrite them. */ if (lockerTransaction.contains(newEntry)) lockerTransaction.remove(newEntry); lockerTransaction.add(newEntry); } } /** Public delete method. Uses the put method, with null as the object value. */ public void remove(int sysKey, String path, Object locker) throws ClusterStorageException { Integer sysKeyIntObj = new Integer(sysKey); ArrayList lockerTransaction; synchronized(locks) { // look to see if this object is already locked if (locks.containsKey(sysKeyIntObj)) { // if it's this locker, get the transaction list Object thisLocker = locks.get(sysKeyIntObj); if (thisLocker.equals(locker)) // retrieve the transaction list lockerTransaction = pendingTransactions.get(locker); else // locked by someone else throw new ClusterStorageException("ClusterStorageManager.get() - Access denied: Object " + sysKeyIntObj + " has been locked for writing by " + thisLocker); } else { // either we are the locker, or there is no locker if (locker == null) { // non-locking put/delete storage.remove(sysKeyIntObj, path); return; } else {// initialise the transaction locks.put(sysKeyIntObj, locker); lockerTransaction = new ArrayList(); pendingTransactions.put(locker, lockerTransaction); } } // create the new entry in the transaction table TransactionEntry newEntry = new TransactionEntry(sysKeyIntObj, path, null); /* equals() in TransactionEntry only compares sysKey and path, so we can use * contains() in ArrayList to looks for preexisting entries for this cluster * and overwrite them. */ if (lockerTransaction.contains(newEntry)) lockerTransaction.remove(newEntry); lockerTransaction.add(newEntry); } } /** * Removes all child objects from the given path * * @param sysKey - entity to delete from * @param path - root path to delete * @param locker - locking object * * @throws ClusterStorageException - when deleting fails */ public void removeCluster(int sysKey, String path, Object locker) throws ClusterStorageException { String[] children = getClusterContents(sysKey, path); for (int i = 0; i < children.length; i++) removeCluster(sysKey, path+(path.length()>0?"/":"")+children[i], locker); if (children.length==0 && path.indexOf("/") > -1) remove(sysKey, path, locker); } /** * Writes all pending changes to the backends. */ public void commit(Object locker) { synchronized(locks) { ArrayList lockerTransactions = (ArrayList)pendingTransactions.get(locker); HashMap exceptions = new HashMap(); // quit if no transactions are present; if (lockerTransactions == null) return; for (Iterator i = lockerTransactions.iterator();i.hasNext();) { TransactionEntry thisEntry = (TransactionEntry)i.next(); try { if (thisEntry.obj == null) storage.remove(thisEntry.sysKey, thisEntry.path); else storage.put(thisEntry.sysKey, thisEntry.obj); locks.remove(thisEntry.sysKey); } catch (Exception e) { exceptions.put(thisEntry, e); } } pendingTransactions.remove(locker); if (exceptions.size() > 0) { // oh dear Logger.error("TransactionManager.commit() - Problems during transaction commit of locker "+locker.toString()+". Database may be in an inconsistent state."); for (Iterator iter = exceptions.keySet().iterator(); iter.hasNext();) { TransactionEntry entry = (TransactionEntry) iter.next(); Exception ex = (Exception)exceptions.get(entry); Logger.msg(entry.toString()); Logger.error(ex); } dumpPendingTransactions(0); Logger.die("Database failure"); } } } /** * Rolls back all changes sent in the name of 'locker' and unlocks the sysKeys */ public void abort(Object locker) { synchronized(locks) { if (locks.containsValue(locker)) { for (Iterator i=locks.keySet().iterator(); i.hasNext();) { Integer thisKey = (Integer)i.next(); if (locks.get(thisKey).equals(locker)) locks.remove(thisKey); } } pendingTransactions.remove(locker); } } public void clearCache(int sysKey, String path) { if (sysKey == -1) storage.clearCache(); else if (path == null) storage.clearCache(new Integer(sysKey)); else storage.clearCache(new Integer(sysKey), path); } public void dumpPendingTransactions(int logLevel) { Logger.msg(logLevel, "================"); Logger.msg(logLevel, "Transaction dump"); Logger.msg(logLevel, "Locked Items:"); if (locks.size() == 0) Logger.msg(logLevel, " None"); else for (Iterator iter = locks.keySet().iterator(); iter.hasNext();) { Integer thisKey = (Integer)iter.next(); Object locker = locks.get(thisKey); Logger.msg(logLevel, " "+thisKey+" locked by "+locker); } Logger.msg(logLevel, "Open transactions:"); if (pendingTransactions.size() == 0) Logger.msg(logLevel, " None"); else for (Iterator iter = pendingTransactions.keySet().iterator(); iter.hasNext();) { Object thisLocker = iter.next(); Logger.msg(logLevel, " Transaction owner:"+thisLocker); ArrayList entries = (ArrayList)pendingTransactions.get(thisLocker); for (Iterator iterator = entries.iterator(); iterator.hasNext();) { TransactionEntry thisEntry = (TransactionEntry) iterator.next(); Logger.msg(logLevel, " "+thisEntry.toString()); } } } /** Used in the transaction table to store details of a put until commit */ class TransactionEntry { public Integer sysKey; public String path; public C2KLocalObject obj; public TransactionEntry(Integer sysKey, String path, C2KLocalObject obj) { this.sysKey = sysKey; this.path = path; this.obj = obj; } public String getPath() { return ClusterStorage.getPath(obj); } public String toString() { StringBuffer report = new StringBuffer(); if (obj == null) report.append("Delete"); else report.append("Put "+obj.getClass().getName()); report.append(" at ").append(path).append(" in ").append(sysKey); return report.toString(); } /** * @see java.lang.Object#hashCode() */ public int hashCode() { return sysKey.hashCode()*getPath().hashCode(); } /** * @see java.lang.Object#equals(java.lang.Object) */ public boolean equals(Object other) { if (other instanceof TransactionEntry) return hashCode() == ((TransactionEntry)other).hashCode(); return false; } } public Object query(String id, Object query) throws ClusterStorageException { return storage.query(id, query); } public String queryToXML(String id, String query, boolean genericFormat) throws ClusterStorageException { return storage.queryToXML(id, query, genericFormat); } }