From da731d2bb81666b9c697d9099da632e7dfcdc0f7 Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Tue, 9 Sep 2014 12:13:21 +0200 Subject: Replaced int sysKey Item identifier with UUID, which is now portable. ItemPath objects are now used to identify Items throughout the kernel, replacing ints and Integers. --- .../persistency/ClusterStorageManager.java | 127 +++++++++------------ 1 file changed, 57 insertions(+), 70 deletions(-) (limited to 'src/main/java/com/c2kernel/persistency/ClusterStorageManager.java') diff --git a/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java b/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java index 5227ab8..c82a50d 100644 --- a/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java +++ b/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java @@ -12,6 +12,7 @@ import com.c2kernel.entity.C2KLocalObject; import com.c2kernel.entity.agent.JobList; import com.c2kernel.entity.proxy.ProxyMessage; import com.c2kernel.events.History; +import com.c2kernel.lookup.ItemPath; import com.c2kernel.persistency.outcome.Outcome; import com.c2kernel.persistency.outcome.Viewpoint; import com.c2kernel.process.Gateway; @@ -23,10 +24,9 @@ import com.c2kernel.utils.WeakCache; /** * instantiates ClusterStorages listed in properties file All read/write requests to storage pass through this object, which * can query the capabilities of each declared storage, and channel requests accordingly. Transaction based. - * - * @author abranson - * @author ogattaz - * + * + * * @version $Revision: 1.62 $ $Date: 2006/02/01 13:27:46 $ + * @author $Author: abranson $ */ public class ClusterStorageManager { HashMap allStores = new HashMap(); @@ -34,22 +34,12 @@ public class ClusterStorageManager { HashMap> clusterWriters = new HashMap>(); HashMap> clusterReaders = new HashMap>(); // we don't need a soft cache for the top level cache - the proxies and entities clear that when reaped - HashMap> memoryCache = new HashMap>(); + HashMap> memoryCache = new HashMap>(); - /** - * Initialises all ClusterStorage handlers listed by class name in the - * property "ClusterStorages" This property is usually process specific, and - * so should be in the server/client.conf and not the connect file. - * - * 2014/08/28 ogattaz : put in place a protection in the constructor : set - * the clusterPriority at the right size according the fact that the - * "clusterStorageProp" property could contains a List of instance of String - * and/or ClusterStorage - * - * @param auth - * an instance of Authenticator - * @throws ClusterStorageException - */ + /** + * Initialises all ClusterStorage handlers listed by class name in the property "ClusterStorages" + * This property is usually process specific, and so should be in the server/client.conf and not the connect file. + */ public ClusterStorageManager(Authenticator auth) throws ClusterStorageException { Object clusterStorageProp = Gateway.getProperties().getObject("ClusterStorage"); if (clusterStorageProp == null || clusterStorageProp.equals("")) { @@ -60,13 +50,10 @@ public class ClusterStorageManager { if (clusterStorageProp instanceof String) rootStores = instantiateStores((String)clusterStorageProp); else if (clusterStorageProp instanceof ArrayList) { - ArrayList wTempStorages = (ArrayList)clusterStorageProp; - - // set the clusterPriority at the right size - clusterPriority = new String[wTempStorages.size()]; - + ArrayList propStores = (ArrayList)clusterStorageProp; rootStores = new ArrayList(); - for (Object thisStore : wTempStorages) { + clusterPriority = new String[propStores.size()]; + for (Object thisStore : propStores) { if (thisStore instanceof ClusterStorage) rootStores.add((ClusterStorage)thisStore); else @@ -89,7 +76,7 @@ public class ClusterStorageManager { Logger.msg(5, "ClusterStorageManager.init() - Cluster storage " + newStorage.getClass().getName() + " initialised successfully."); allStores.put(newStorage.getId(), newStorage); - clusterPriority[clusterNo++] = newStorage.getId(); + clusterPriority[clusterNo++] = newStorage.getId(); } clusterReaders.put(ClusterStorage.ROOT, rootStores); // all storages are queried for clusters at the root level @@ -170,7 +157,7 @@ public class ClusterStorageManager { * Retrieves the ids of the next level of a cluster * Does not look in any currently open transactions. */ - public String[] getClusterContents(Integer sysKey, String path) throws ClusterStorageException { + public String[] getClusterContents(ItemPath itemPath, String path) throws ClusterStorageException { ArrayList contents = new ArrayList(); // get all readers @@ -179,7 +166,7 @@ public class ClusterStorageManager { // try each in turn until we get a result for (ClusterStorage thisReader : readers) { try { - String[] thisArr = thisReader.getClusterContents(sysKey, path); + String[] thisArr = thisReader.getClusterContents(itemPath, path); if (thisArr != null) { for (int j = 0; j < thisArr.length; j++) if (!contents.contains(thisArr[j])) { @@ -189,7 +176,7 @@ public class ClusterStorageManager { } } catch (ClusterStorageException e) { Logger.msg(5, "ClusterStorageManager.getClusterContents() - reader " + thisReader.getName() + - " could not retrieve contents of " + sysKey + "/" + path + ": " + e.getMessage()); + " could not retrieve contents of " + itemPath + "/" + path + ": " + e.getMessage()); } } @@ -199,16 +186,16 @@ public class ClusterStorageManager { } /** Internal get method. Retrieves clusters from ClusterStorages & maintains the memory cache */ - public C2KLocalObject get(Integer sysKeyIntObj, String path) throws ClusterStorageException, ObjectNotFoundException { + public C2KLocalObject get(ItemPath itemPath, String path) throws ClusterStorageException, ObjectNotFoundException { C2KLocalObject result = null; // check cache first Map sysKeyMemCache = null; - sysKeyMemCache = memoryCache.get(sysKeyIntObj); + sysKeyMemCache = memoryCache.get(itemPath); if (sysKeyMemCache != null) { synchronized(sysKeyMemCache) { C2KLocalObject obj = sysKeyMemCache.get(path); if (obj != null) { - Logger.msg(7, "ClusterStorageManager.get() - found "+sysKeyIntObj+"/"+path+" in memcache"); + Logger.msg(7, "ClusterStorageManager.get() - found "+itemPath+"/"+path+" in memcache"); return obj; } } @@ -220,7 +207,7 @@ public class ClusterStorageManager { StringTokenizer tok = new StringTokenizer(path,"/"); if (tok.countTokens() == 4) { // to not catch viewpoints called 'data' Outcome data = null; - Viewpoint view = (Viewpoint)get(sysKeyIntObj, path.substring(0, path.lastIndexOf("/"))); + Viewpoint view = (Viewpoint)get(itemPath, path.substring(0, path.lastIndexOf("/"))); if (view != null) data = view.getOutcome(); return data; @@ -230,9 +217,9 @@ public class ClusterStorageManager { // deal out top level remote maps if (path.indexOf('/') == -1) { if (path.equals(ClusterStorage.HISTORY)) - result = new History(sysKeyIntObj, null); + result = new History(itemPath, null); if (path.equals(ClusterStorage.JOB)) - result = new JobList(sysKeyIntObj, null); + result = new JobList(itemPath, null); if (result!=null) { synchronized(sysKeyMemCache) { sysKeyMemCache.put(path, result); @@ -246,16 +233,16 @@ public class ClusterStorageManager { ArrayList readers = findStorages(ClusterStorage.getClusterType(path), false); for (ClusterStorage thisReader : readers) { try { - result = thisReader.get(sysKeyIntObj, path); - Logger.msg(7, "ClusterStorageManager.get() - reading "+path+" from "+thisReader.getName() + " for intkey=" + sysKeyIntObj); + result = thisReader.get(itemPath, path); + Logger.msg(7, "ClusterStorageManager.get() - reading "+path+" from "+thisReader.getName() + " for item " + itemPath); if (result != null) { // got it! // store it in the cache if (sysKeyMemCache == null) { // create cache if needed boolean useWeak = Gateway.getProperties().getBoolean("Storage.useWeakCache", false); - Logger.msg(7,"ClusterStorageManager.put() - Creating "+(useWeak?"Weak":"Strong")+" cache for entity "+sysKeyIntObj); + Logger.msg(7,"ClusterStorageManager.put() - Creating "+(useWeak?"Weak":"Strong")+" cache for item "+itemPath); sysKeyMemCache = useWeak?new WeakCache():new SoftCache(0); synchronized (memoryCache) { - memoryCache.put(sysKeyIntObj, sysKeyMemCache); + memoryCache.put(itemPath, sysKeyMemCache); } } synchronized(sysKeyMemCache) { @@ -265,37 +252,37 @@ public class ClusterStorageManager { return result; } } catch (ClusterStorageException e) { - Logger.msg(7, "ClusterStorageManager.get() - reader " + thisReader.getName() + " could not retrieve " + sysKeyIntObj + + Logger.msg(7, "ClusterStorageManager.get() - reader " + thisReader.getName() + " could not retrieve " + itemPath + "/" + path + ": " + e.getMessage()); } } - throw new ObjectNotFoundException("ClusterStorageManager.get() - Path " + path + " not found in " + sysKeyIntObj, ""); + throw new ObjectNotFoundException("ClusterStorageManager.get() - Path " + path + " not found in " + itemPath, ""); } /** Internal put method. Creates or overwrites a cluster in all writers. Used when committing transactions. */ - public void put(Integer sysKeyIntObj, C2KLocalObject obj) throws ClusterStorageException { + public void put(ItemPath itemPath, C2KLocalObject obj) throws ClusterStorageException { String path = ClusterStorage.getPath(obj); ArrayList writers = findStorages(ClusterStorage.getClusterType(path), true); for (ClusterStorage thisWriter : writers) { try { Logger.msg(7, "ClusterStorageManager.put() - writing "+path+" to "+thisWriter.getName()); - thisWriter.put(sysKeyIntObj, obj); + thisWriter.put(itemPath, obj); } catch (ClusterStorageException e) { Logger.error("ClusterStorageManager.put() - writer " + thisWriter.getName() + " could not store " + - sysKeyIntObj + "/" + path + ": " + e.getMessage()); + itemPath + "/" + path + ": " + e.getMessage()); throw e; } } // put in mem cache if that worked Map sysKeyMemCache; - if (memoryCache.containsKey(sysKeyIntObj)) - sysKeyMemCache = memoryCache.get(sysKeyIntObj); + if (memoryCache.containsKey(itemPath)) + sysKeyMemCache = memoryCache.get(itemPath); else { boolean useWeak = Gateway.getProperties().getBoolean("Storage.useWeakCache", false); - Logger.msg(7,"ClusterStorageManager.put() - Creating "+(useWeak?"Weak":"Strong")+" cache for entity "+sysKeyIntObj); + Logger.msg(7,"ClusterStorageManager.put() - Creating "+(useWeak?"Weak":"Strong")+" cache for entity "+itemPath); sysKeyMemCache = useWeak?new WeakCache():new SoftCache(0); synchronized (memoryCache) { - memoryCache.put(sysKeyIntObj, sysKeyMemCache); + memoryCache.put(itemPath, sysKeyMemCache); } } @@ -306,45 +293,45 @@ public class ClusterStorageManager { if (Logger.doLog(9)) dumpCacheContents(9); // transmit proxy event - Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.ADDED)); + Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(itemPath, path, ProxyMessage.ADDED)); } /** Deletes a cluster from all writers */ - public void remove(Integer sysKeyIntObj, String path) throws ClusterStorageException { + public void remove(ItemPath itemPath, String path) throws ClusterStorageException { ArrayList writers = findStorages(ClusterStorage.getClusterType(path), true); for (ClusterStorage thisWriter : writers) { try { Logger.msg(7, "ClusterStorageManager.delete() - removing "+path+" from "+thisWriter.getName()); - thisWriter.delete(sysKeyIntObj, path); + thisWriter.delete(itemPath, path); } catch (ClusterStorageException e) { - Logger.error("ClusterStorageManager.delete() - writer " + thisWriter.getName() + " could not delete " + sysKeyIntObj + + Logger.error("ClusterStorageManager.delete() - writer " + thisWriter.getName() + " could not delete " + itemPath + "/" + path + ": " + e.getMessage()); throw e; } } - if (memoryCache.containsKey(sysKeyIntObj)) { - Map sysKeyMemCache = memoryCache.get(sysKeyIntObj); - synchronized (sysKeyMemCache) { - sysKeyMemCache.remove(path); + if (memoryCache.containsKey(itemPath)) { + Map itemMemCache = memoryCache.get(itemPath); + synchronized (itemMemCache) { + itemMemCache.remove(path); } } // transmit proxy event - Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.DELETED)); + Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(itemPath, path, ProxyMessage.DELETED)); } - public void clearCache(Integer sysKeyIntObj, String path) { - Logger.msg(7, "CSM.clearCache() - removing "+sysKeyIntObj+"/"+path); + public void clearCache(ItemPath itemPath, String path) { + Logger.msg(7, "CSM.clearCache() - removing "+itemPath+"/"+path); - if (memoryCache.containsKey(sysKeyIntObj)) { - Map sysKeyMemCache = memoryCache.get(sysKeyIntObj); + if (memoryCache.containsKey(itemPath)) { + Map sysKeyMemCache = memoryCache.get(itemPath); synchronized(sysKeyMemCache) { for (Iterator iter = sysKeyMemCache.keySet().iterator(); iter.hasNext();) { String thisPath = iter.next(); if (thisPath.startsWith(path)) { - Logger.msg(7, "CSM.clearCache() - removing "+sysKeyIntObj+"/"+thisPath); + Logger.msg(7, "CSM.clearCache() - removing "+itemPath+"/"+thisPath); iter.remove(); } } @@ -352,18 +339,18 @@ public class ClusterStorageManager { } } - public void clearCache(Integer sysKeyIntObj) { + public void clearCache(ItemPath itemPath) { - Logger.msg(5, "CSM.clearCache() - removing entire cache of "+sysKeyIntObj); + Logger.msg(5, "CSM.clearCache() - removing entire cache of "+itemPath); - if (memoryCache.containsKey(sysKeyIntObj)) { + if (memoryCache.containsKey(itemPath)) { synchronized (memoryCache) { if (Logger.doLog(6)) { - Map sysKeyMemCache = memoryCache.get(sysKeyIntObj); + Map sysKeyMemCache = memoryCache.get(itemPath); int size = sysKeyMemCache.size(); Logger.msg(6, "CSM.clearCache() - "+size+" objects to remove."); } - memoryCache.remove(sysKeyIntObj); + memoryCache.remove(itemPath); } } else @@ -380,9 +367,9 @@ public class ClusterStorageManager { public void dumpCacheContents(int logLevel) { if (!Logger.doLog(logLevel)) return; synchronized(memoryCache) { - for (Integer sysKey : memoryCache.keySet()) { - Logger.msg(logLevel, "Cached Objects of Entity "+sysKey); - Map sysKeyMemCache = memoryCache.get(sysKey); + for (ItemPath itemPath : memoryCache.keySet()) { + Logger.msg(logLevel, "Cached Objects of Entity "+itemPath); + Map sysKeyMemCache = memoryCache.get(itemPath); try { synchronized(sysKeyMemCache) { for (Object name : sysKeyMemCache.keySet()) { -- cgit v1.2.3