summaryrefslogtreecommitdiff
path: root/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java
diff options
context:
space:
mode:
authorAndrew Branson <andrew.branson@cern.ch>2014-09-09 12:13:21 +0200
committerAndrew Branson <andrew.branson@cern.ch>2014-09-09 12:13:21 +0200
commitda731d2bb81666b9c697d9099da632e7dfcdc0f7 (patch)
tree567693c3c48f3d15ecbb2dac4f9db03bb6e58c72 /src/main/java/com/c2kernel/persistency/ClusterStorageManager.java
parentae1e79e33fd30e3d8bcedbef8891a14a048276d7 (diff)
Replaced int sysKey Item identifier with UUID, which is now portable.
ItemPath objects are now used to identify Items throughout the kernel, replacing ints and Integers.
Diffstat (limited to 'src/main/java/com/c2kernel/persistency/ClusterStorageManager.java')
-rw-r--r--src/main/java/com/c2kernel/persistency/ClusterStorageManager.java127
1 files changed, 57 insertions, 70 deletions
diff --git a/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java b/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java
index 5227ab8..c82a50d 100644
--- a/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java
+++ b/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java
@@ -12,6 +12,7 @@ import com.c2kernel.entity.C2KLocalObject;
import com.c2kernel.entity.agent.JobList;
import com.c2kernel.entity.proxy.ProxyMessage;
import com.c2kernel.events.History;
+import com.c2kernel.lookup.ItemPath;
import com.c2kernel.persistency.outcome.Outcome;
import com.c2kernel.persistency.outcome.Viewpoint;
import com.c2kernel.process.Gateway;
@@ -23,10 +24,9 @@ import com.c2kernel.utils.WeakCache;
/**
* instantiates ClusterStorages listed in properties file All read/write requests to storage pass through this object, which
* can query the capabilities of each declared storage, and channel requests accordingly. Transaction based.
- *
- * @author abranson
- * @author ogattaz
- *
+ *
+ * * @version $Revision: 1.62 $ $Date: 2006/02/01 13:27:46 $
+ * @author $Author: abranson $
*/
public class ClusterStorageManager {
HashMap<String, ClusterStorage> allStores = new HashMap<String, ClusterStorage>();
@@ -34,22 +34,12 @@ public class ClusterStorageManager {
HashMap<String, ArrayList<ClusterStorage>> clusterWriters = new HashMap<String, ArrayList<ClusterStorage>>();
HashMap<String, ArrayList<ClusterStorage>> clusterReaders = new HashMap<String, ArrayList<ClusterStorage>>();
// we don't need a soft cache for the top level cache - the proxies and entities clear that when reaped
- HashMap<Integer, Map<String, C2KLocalObject>> memoryCache = new HashMap<Integer, Map<String, C2KLocalObject>>();
+ HashMap<ItemPath, Map<String, C2KLocalObject>> memoryCache = new HashMap<ItemPath, Map<String, C2KLocalObject>>();
- /**
- * Initialises all ClusterStorage handlers listed by class name in the
- * property "ClusterStorages" This property is usually process specific, and
- * so should be in the server/client.conf and not the connect file.
- *
- * 2014/08/28 ogattaz : put in place a protection in the constructor : set
- * the clusterPriority at the right size according the fact that the
- * "clusterStorageProp" property could contains a List of instance of String
- * and/or ClusterStorage
- *
- * @param auth
- * an instance of Authenticator
- * @throws ClusterStorageException
- */
+ /**
+ * Initialises all ClusterStorage handlers listed by class name in the property "ClusterStorages"
+ * This property is usually process specific, and so should be in the server/client.conf and not the connect file.
+ */
public ClusterStorageManager(Authenticator auth) throws ClusterStorageException {
Object clusterStorageProp = Gateway.getProperties().getObject("ClusterStorage");
if (clusterStorageProp == null || clusterStorageProp.equals("")) {
@@ -60,13 +50,10 @@ public class ClusterStorageManager {
if (clusterStorageProp instanceof String)
rootStores = instantiateStores((String)clusterStorageProp);
else if (clusterStorageProp instanceof ArrayList<?>) {
- ArrayList<?> wTempStorages = (ArrayList<?>)clusterStorageProp;
-
- // set the clusterPriority at the right size
- clusterPriority = new String[wTempStorages.size()];
-
+ ArrayList<?> propStores = (ArrayList<?>)clusterStorageProp;
rootStores = new ArrayList<ClusterStorage>();
- for (Object thisStore : wTempStorages) {
+ clusterPriority = new String[propStores.size()];
+ for (Object thisStore : propStores) {
if (thisStore instanceof ClusterStorage)
rootStores.add((ClusterStorage)thisStore);
else
@@ -89,7 +76,7 @@ public class ClusterStorageManager {
Logger.msg(5, "ClusterStorageManager.init() - Cluster storage " + newStorage.getClass().getName() +
" initialised successfully.");
allStores.put(newStorage.getId(), newStorage);
- clusterPriority[clusterNo++] = newStorage.getId();
+ clusterPriority[clusterNo++] = newStorage.getId();
}
clusterReaders.put(ClusterStorage.ROOT, rootStores); // all storages are queried for clusters at the root level
@@ -170,7 +157,7 @@ public class ClusterStorageManager {
* Retrieves the ids of the next level of a cluster
* Does not look in any currently open transactions.
*/
- public String[] getClusterContents(Integer sysKey, String path) throws ClusterStorageException {
+ public String[] getClusterContents(ItemPath itemPath, String path) throws ClusterStorageException {
ArrayList<String> contents = new ArrayList<String>();
// get all readers
@@ -179,7 +166,7 @@ public class ClusterStorageManager {
// try each in turn until we get a result
for (ClusterStorage thisReader : readers) {
try {
- String[] thisArr = thisReader.getClusterContents(sysKey, path);
+ String[] thisArr = thisReader.getClusterContents(itemPath, path);
if (thisArr != null) {
for (int j = 0; j < thisArr.length; j++)
if (!contents.contains(thisArr[j])) {
@@ -189,7 +176,7 @@ public class ClusterStorageManager {
}
} catch (ClusterStorageException e) {
Logger.msg(5, "ClusterStorageManager.getClusterContents() - reader " + thisReader.getName() +
- " could not retrieve contents of " + sysKey + "/" + path + ": " + e.getMessage());
+ " could not retrieve contents of " + itemPath + "/" + path + ": " + e.getMessage());
}
}
@@ -199,16 +186,16 @@ public class ClusterStorageManager {
}
/** Internal get method. Retrieves clusters from ClusterStorages & maintains the memory cache */
- public C2KLocalObject get(Integer sysKeyIntObj, String path) throws ClusterStorageException, ObjectNotFoundException {
+ public C2KLocalObject get(ItemPath itemPath, String path) throws ClusterStorageException, ObjectNotFoundException {
C2KLocalObject result = null;
// check cache first
Map<String, C2KLocalObject> sysKeyMemCache = null;
- sysKeyMemCache = memoryCache.get(sysKeyIntObj);
+ sysKeyMemCache = memoryCache.get(itemPath);
if (sysKeyMemCache != null) {
synchronized(sysKeyMemCache) {
C2KLocalObject obj = sysKeyMemCache.get(path);
if (obj != null) {
- Logger.msg(7, "ClusterStorageManager.get() - found "+sysKeyIntObj+"/"+path+" in memcache");
+ Logger.msg(7, "ClusterStorageManager.get() - found "+itemPath+"/"+path+" in memcache");
return obj;
}
}
@@ -220,7 +207,7 @@ public class ClusterStorageManager {
StringTokenizer tok = new StringTokenizer(path,"/");
if (tok.countTokens() == 4) { // to not catch viewpoints called 'data'
Outcome data = null;
- Viewpoint view = (Viewpoint)get(sysKeyIntObj, path.substring(0, path.lastIndexOf("/")));
+ Viewpoint view = (Viewpoint)get(itemPath, path.substring(0, path.lastIndexOf("/")));
if (view != null)
data = view.getOutcome();
return data;
@@ -230,9 +217,9 @@ public class ClusterStorageManager {
// deal out top level remote maps
if (path.indexOf('/') == -1) {
if (path.equals(ClusterStorage.HISTORY))
- result = new History(sysKeyIntObj, null);
+ result = new History(itemPath, null);
if (path.equals(ClusterStorage.JOB))
- result = new JobList(sysKeyIntObj, null);
+ result = new JobList(itemPath, null);
if (result!=null) {
synchronized(sysKeyMemCache) {
sysKeyMemCache.put(path, result);
@@ -246,16 +233,16 @@ public class ClusterStorageManager {
ArrayList<ClusterStorage> readers = findStorages(ClusterStorage.getClusterType(path), false);
for (ClusterStorage thisReader : readers) {
try {
- result = thisReader.get(sysKeyIntObj, path);
- Logger.msg(7, "ClusterStorageManager.get() - reading "+path+" from "+thisReader.getName() + " for intkey=" + sysKeyIntObj);
+ result = thisReader.get(itemPath, path);
+ Logger.msg(7, "ClusterStorageManager.get() - reading "+path+" from "+thisReader.getName() + " for item " + itemPath);
if (result != null) { // got it!
// store it in the cache
if (sysKeyMemCache == null) { // create cache if needed
boolean useWeak = Gateway.getProperties().getBoolean("Storage.useWeakCache", false);
- Logger.msg(7,"ClusterStorageManager.put() - Creating "+(useWeak?"Weak":"Strong")+" cache for entity "+sysKeyIntObj);
+ Logger.msg(7,"ClusterStorageManager.put() - Creating "+(useWeak?"Weak":"Strong")+" cache for item "+itemPath);
sysKeyMemCache = useWeak?new WeakCache<String, C2KLocalObject>():new SoftCache<String, C2KLocalObject>(0);
synchronized (memoryCache) {
- memoryCache.put(sysKeyIntObj, sysKeyMemCache);
+ memoryCache.put(itemPath, sysKeyMemCache);
}
}
synchronized(sysKeyMemCache) {
@@ -265,37 +252,37 @@ public class ClusterStorageManager {
return result;
}
} catch (ClusterStorageException e) {
- Logger.msg(7, "ClusterStorageManager.get() - reader " + thisReader.getName() + " could not retrieve " + sysKeyIntObj +
+ Logger.msg(7, "ClusterStorageManager.get() - reader " + thisReader.getName() + " could not retrieve " + itemPath +
"/" + path + ": " + e.getMessage());
}
}
- throw new ObjectNotFoundException("ClusterStorageManager.get() - Path " + path + " not found in " + sysKeyIntObj, "");
+ throw new ObjectNotFoundException("ClusterStorageManager.get() - Path " + path + " not found in " + itemPath, "");
}
/** Internal put method. Creates or overwrites a cluster in all writers. Used when committing transactions. */
- public void put(Integer sysKeyIntObj, C2KLocalObject obj) throws ClusterStorageException {
+ public void put(ItemPath itemPath, C2KLocalObject obj) throws ClusterStorageException {
String path = ClusterStorage.getPath(obj);
ArrayList<ClusterStorage> writers = findStorages(ClusterStorage.getClusterType(path), true);
for (ClusterStorage thisWriter : writers) {
try {
Logger.msg(7, "ClusterStorageManager.put() - writing "+path+" to "+thisWriter.getName());
- thisWriter.put(sysKeyIntObj, obj);
+ thisWriter.put(itemPath, obj);
} catch (ClusterStorageException e) {
Logger.error("ClusterStorageManager.put() - writer " + thisWriter.getName() + " could not store " +
- sysKeyIntObj + "/" + path + ": " + e.getMessage());
+ itemPath + "/" + path + ": " + e.getMessage());
throw e;
}
}
// put in mem cache if that worked
Map<String, C2KLocalObject> sysKeyMemCache;
- if (memoryCache.containsKey(sysKeyIntObj))
- sysKeyMemCache = memoryCache.get(sysKeyIntObj);
+ if (memoryCache.containsKey(itemPath))
+ sysKeyMemCache = memoryCache.get(itemPath);
else {
boolean useWeak = Gateway.getProperties().getBoolean("Storage.useWeakCache", false);
- Logger.msg(7,"ClusterStorageManager.put() - Creating "+(useWeak?"Weak":"Strong")+" cache for entity "+sysKeyIntObj);
+ Logger.msg(7,"ClusterStorageManager.put() - Creating "+(useWeak?"Weak":"Strong")+" cache for entity "+itemPath);
sysKeyMemCache = useWeak?new WeakCache<String, C2KLocalObject>():new SoftCache<String, C2KLocalObject>(0);
synchronized (memoryCache) {
- memoryCache.put(sysKeyIntObj, sysKeyMemCache);
+ memoryCache.put(itemPath, sysKeyMemCache);
}
}
@@ -306,45 +293,45 @@ public class ClusterStorageManager {
if (Logger.doLog(9)) dumpCacheContents(9);
// transmit proxy event
- Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.ADDED));
+ Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(itemPath, path, ProxyMessage.ADDED));
}
/** Deletes a cluster from all writers */
- public void remove(Integer sysKeyIntObj, String path) throws ClusterStorageException {
+ public void remove(ItemPath itemPath, String path) throws ClusterStorageException {
ArrayList<ClusterStorage> writers = findStorages(ClusterStorage.getClusterType(path), true);
for (ClusterStorage thisWriter : writers) {
try {
Logger.msg(7, "ClusterStorageManager.delete() - removing "+path+" from "+thisWriter.getName());
- thisWriter.delete(sysKeyIntObj, path);
+ thisWriter.delete(itemPath, path);
} catch (ClusterStorageException e) {
- Logger.error("ClusterStorageManager.delete() - writer " + thisWriter.getName() + " could not delete " + sysKeyIntObj +
+ Logger.error("ClusterStorageManager.delete() - writer " + thisWriter.getName() + " could not delete " + itemPath +
"/" + path + ": " + e.getMessage());
throw e;
}
}
- if (memoryCache.containsKey(sysKeyIntObj)) {
- Map<String, C2KLocalObject> sysKeyMemCache = memoryCache.get(sysKeyIntObj);
- synchronized (sysKeyMemCache) {
- sysKeyMemCache.remove(path);
+ if (memoryCache.containsKey(itemPath)) {
+ Map<String, C2KLocalObject> itemMemCache = memoryCache.get(itemPath);
+ synchronized (itemMemCache) {
+ itemMemCache.remove(path);
}
}
// transmit proxy event
- Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.DELETED));
+ Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(itemPath, path, ProxyMessage.DELETED));
}
- public void clearCache(Integer sysKeyIntObj, String path) {
- Logger.msg(7, "CSM.clearCache() - removing "+sysKeyIntObj+"/"+path);
+ public void clearCache(ItemPath itemPath, String path) {
+ Logger.msg(7, "CSM.clearCache() - removing "+itemPath+"/"+path);
- if (memoryCache.containsKey(sysKeyIntObj)) {
- Map<String, C2KLocalObject> sysKeyMemCache = memoryCache.get(sysKeyIntObj);
+ if (memoryCache.containsKey(itemPath)) {
+ Map<String, C2KLocalObject> sysKeyMemCache = memoryCache.get(itemPath);
synchronized(sysKeyMemCache) {
for (Iterator<String> iter = sysKeyMemCache.keySet().iterator(); iter.hasNext();) {
String thisPath = iter.next();
if (thisPath.startsWith(path)) {
- Logger.msg(7, "CSM.clearCache() - removing "+sysKeyIntObj+"/"+thisPath);
+ Logger.msg(7, "CSM.clearCache() - removing "+itemPath+"/"+thisPath);
iter.remove();
}
}
@@ -352,18 +339,18 @@ public class ClusterStorageManager {
}
}
- public void clearCache(Integer sysKeyIntObj) {
+ public void clearCache(ItemPath itemPath) {
- Logger.msg(5, "CSM.clearCache() - removing entire cache of "+sysKeyIntObj);
+ Logger.msg(5, "CSM.clearCache() - removing entire cache of "+itemPath);
- if (memoryCache.containsKey(sysKeyIntObj)) {
+ if (memoryCache.containsKey(itemPath)) {
synchronized (memoryCache) {
if (Logger.doLog(6)) {
- Map<String, C2KLocalObject> sysKeyMemCache = memoryCache.get(sysKeyIntObj);
+ Map<String, C2KLocalObject> sysKeyMemCache = memoryCache.get(itemPath);
int size = sysKeyMemCache.size();
Logger.msg(6, "CSM.clearCache() - "+size+" objects to remove.");
}
- memoryCache.remove(sysKeyIntObj);
+ memoryCache.remove(itemPath);
}
}
else
@@ -380,9 +367,9 @@ public class ClusterStorageManager {
public void dumpCacheContents(int logLevel) {
if (!Logger.doLog(logLevel)) return;
synchronized(memoryCache) {
- for (Integer sysKey : memoryCache.keySet()) {
- Logger.msg(logLevel, "Cached Objects of Entity "+sysKey);
- Map<String, C2KLocalObject> sysKeyMemCache = memoryCache.get(sysKey);
+ for (ItemPath itemPath : memoryCache.keySet()) {
+ Logger.msg(logLevel, "Cached Objects of Entity "+itemPath);
+ Map<String, C2KLocalObject> sysKeyMemCache = memoryCache.get(itemPath);
try {
synchronized(sysKeyMemCache) {
for (Object name : sysKeyMemCache.keySet()) {