diff options
Diffstat (limited to 'src/main/java/com/c2kernel/persistency/ClusterStorageManager.java')
| -rw-r--r-- | src/main/java/com/c2kernel/persistency/ClusterStorageManager.java | 127 |
1 files changed, 57 insertions, 70 deletions
diff --git a/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java b/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java index 5227ab8..c82a50d 100644 --- a/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java +++ b/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java @@ -12,6 +12,7 @@ import com.c2kernel.entity.C2KLocalObject; import com.c2kernel.entity.agent.JobList;
import com.c2kernel.entity.proxy.ProxyMessage;
import com.c2kernel.events.History;
+import com.c2kernel.lookup.ItemPath;
import com.c2kernel.persistency.outcome.Outcome;
import com.c2kernel.persistency.outcome.Viewpoint;
import com.c2kernel.process.Gateway;
@@ -23,10 +24,9 @@ import com.c2kernel.utils.WeakCache; /**
* instantiates ClusterStorages listed in properties file All read/write requests to storage pass through this object, which
* can query the capabilities of each declared storage, and channel requests accordingly. Transaction based.
- *
- * @author abranson
- * @author ogattaz
- *
+ *
+ * * @version $Revision: 1.62 $ $Date: 2006/02/01 13:27:46 $
+ * @author $Author: abranson $
*/
public class ClusterStorageManager {
HashMap<String, ClusterStorage> allStores = new HashMap<String, ClusterStorage>();
@@ -34,22 +34,12 @@ public class ClusterStorageManager { HashMap<String, ArrayList<ClusterStorage>> clusterWriters = new HashMap<String, ArrayList<ClusterStorage>>();
HashMap<String, ArrayList<ClusterStorage>> clusterReaders = new HashMap<String, ArrayList<ClusterStorage>>();
// we don't need a soft cache for the top level cache - the proxies and entities clear that when reaped
- HashMap<Integer, Map<String, C2KLocalObject>> memoryCache = new HashMap<Integer, Map<String, C2KLocalObject>>();
+ HashMap<ItemPath, Map<String, C2KLocalObject>> memoryCache = new HashMap<ItemPath, Map<String, C2KLocalObject>>();
- /**
- * Initialises all ClusterStorage handlers listed by class name in the
- * property "ClusterStorages" This property is usually process specific, and
- * so should be in the server/client.conf and not the connect file.
- *
- * 2014/08/28 ogattaz : put in place a protection in the constructor : set
- * the clusterPriority at the right size according the fact that the
- * "clusterStorageProp" property could contains a List of instance of String
- * and/or ClusterStorage
- *
- * @param auth
- * an instance of Authenticator
- * @throws ClusterStorageException
- */
+ /**
+ * Initialises all ClusterStorage handlers listed by class name in the property "ClusterStorages"
+ * This property is usually process specific, and so should be in the server/client.conf and not the connect file.
+ */
public ClusterStorageManager(Authenticator auth) throws ClusterStorageException {
Object clusterStorageProp = Gateway.getProperties().getObject("ClusterStorage");
if (clusterStorageProp == null || clusterStorageProp.equals("")) {
@@ -60,13 +50,10 @@ public class ClusterStorageManager { if (clusterStorageProp instanceof String)
rootStores = instantiateStores((String)clusterStorageProp);
else if (clusterStorageProp instanceof ArrayList<?>) {
- ArrayList<?> wTempStorages = (ArrayList<?>)clusterStorageProp;
-
- // set the clusterPriority at the right size
- clusterPriority = new String[wTempStorages.size()];
-
+ ArrayList<?> propStores = (ArrayList<?>)clusterStorageProp;
rootStores = new ArrayList<ClusterStorage>();
- for (Object thisStore : wTempStorages) {
+ clusterPriority = new String[propStores.size()];
+ for (Object thisStore : propStores) {
if (thisStore instanceof ClusterStorage)
rootStores.add((ClusterStorage)thisStore);
else
@@ -89,7 +76,7 @@ public class ClusterStorageManager { Logger.msg(5, "ClusterStorageManager.init() - Cluster storage " + newStorage.getClass().getName() +
" initialised successfully.");
allStores.put(newStorage.getId(), newStorage);
- clusterPriority[clusterNo++] = newStorage.getId();
+ clusterPriority[clusterNo++] = newStorage.getId();
}
clusterReaders.put(ClusterStorage.ROOT, rootStores); // all storages are queried for clusters at the root level
@@ -170,7 +157,7 @@ public class ClusterStorageManager { * Retrieves the ids of the next level of a cluster
* Does not look in any currently open transactions.
*/
- public String[] getClusterContents(Integer sysKey, String path) throws ClusterStorageException {
+ public String[] getClusterContents(ItemPath itemPath, String path) throws ClusterStorageException {
ArrayList<String> contents = new ArrayList<String>();
// get all readers
@@ -179,7 +166,7 @@ public class ClusterStorageManager { // try each in turn until we get a result
for (ClusterStorage thisReader : readers) {
try {
- String[] thisArr = thisReader.getClusterContents(sysKey, path);
+ String[] thisArr = thisReader.getClusterContents(itemPath, path);
if (thisArr != null) {
for (int j = 0; j < thisArr.length; j++)
if (!contents.contains(thisArr[j])) {
@@ -189,7 +176,7 @@ public class ClusterStorageManager { }
} catch (ClusterStorageException e) {
Logger.msg(5, "ClusterStorageManager.getClusterContents() - reader " + thisReader.getName() +
- " could not retrieve contents of " + sysKey + "/" + path + ": " + e.getMessage());
+ " could not retrieve contents of " + itemPath + "/" + path + ": " + e.getMessage());
}
}
@@ -199,16 +186,16 @@ public class ClusterStorageManager { }
/** Internal get method. Retrieves clusters from ClusterStorages & maintains the memory cache */
- public C2KLocalObject get(Integer sysKeyIntObj, String path) throws ClusterStorageException, ObjectNotFoundException {
+ public C2KLocalObject get(ItemPath itemPath, String path) throws ClusterStorageException, ObjectNotFoundException {
C2KLocalObject result = null;
// check cache first
Map<String, C2KLocalObject> sysKeyMemCache = null;
- sysKeyMemCache = memoryCache.get(sysKeyIntObj);
+ sysKeyMemCache = memoryCache.get(itemPath);
if (sysKeyMemCache != null) {
synchronized(sysKeyMemCache) {
C2KLocalObject obj = sysKeyMemCache.get(path);
if (obj != null) {
- Logger.msg(7, "ClusterStorageManager.get() - found "+sysKeyIntObj+"/"+path+" in memcache");
+ Logger.msg(7, "ClusterStorageManager.get() - found "+itemPath+"/"+path+" in memcache");
return obj;
}
}
@@ -220,7 +207,7 @@ public class ClusterStorageManager { StringTokenizer tok = new StringTokenizer(path,"/");
if (tok.countTokens() == 4) { // to not catch viewpoints called 'data'
Outcome data = null;
- Viewpoint view = (Viewpoint)get(sysKeyIntObj, path.substring(0, path.lastIndexOf("/")));
+ Viewpoint view = (Viewpoint)get(itemPath, path.substring(0, path.lastIndexOf("/")));
if (view != null)
data = view.getOutcome();
return data;
@@ -230,9 +217,9 @@ public class ClusterStorageManager { // deal out top level remote maps
if (path.indexOf('/') == -1) {
if (path.equals(ClusterStorage.HISTORY))
- result = new History(sysKeyIntObj, null);
+ result = new History(itemPath, null);
if (path.equals(ClusterStorage.JOB))
- result = new JobList(sysKeyIntObj, null);
+ result = new JobList(itemPath, null);
if (result!=null) {
synchronized(sysKeyMemCache) {
sysKeyMemCache.put(path, result);
@@ -246,16 +233,16 @@ public class ClusterStorageManager { ArrayList<ClusterStorage> readers = findStorages(ClusterStorage.getClusterType(path), false);
for (ClusterStorage thisReader : readers) {
try {
- result = thisReader.get(sysKeyIntObj, path);
- Logger.msg(7, "ClusterStorageManager.get() - reading "+path+" from "+thisReader.getName() + " for intkey=" + sysKeyIntObj);
+ result = thisReader.get(itemPath, path);
+ Logger.msg(7, "ClusterStorageManager.get() - reading "+path+" from "+thisReader.getName() + " for item " + itemPath);
if (result != null) { // got it!
// store it in the cache
if (sysKeyMemCache == null) { // create cache if needed
boolean useWeak = Gateway.getProperties().getBoolean("Storage.useWeakCache", false);
- Logger.msg(7,"ClusterStorageManager.put() - Creating "+(useWeak?"Weak":"Strong")+" cache for entity "+sysKeyIntObj);
+ Logger.msg(7,"ClusterStorageManager.put() - Creating "+(useWeak?"Weak":"Strong")+" cache for item "+itemPath);
sysKeyMemCache = useWeak?new WeakCache<String, C2KLocalObject>():new SoftCache<String, C2KLocalObject>(0);
synchronized (memoryCache) {
- memoryCache.put(sysKeyIntObj, sysKeyMemCache);
+ memoryCache.put(itemPath, sysKeyMemCache);
}
}
synchronized(sysKeyMemCache) {
@@ -265,37 +252,37 @@ public class ClusterStorageManager { return result;
}
} catch (ClusterStorageException e) {
- Logger.msg(7, "ClusterStorageManager.get() - reader " + thisReader.getName() + " could not retrieve " + sysKeyIntObj +
+ Logger.msg(7, "ClusterStorageManager.get() - reader " + thisReader.getName() + " could not retrieve " + itemPath +
"/" + path + ": " + e.getMessage());
}
}
- throw new ObjectNotFoundException("ClusterStorageManager.get() - Path " + path + " not found in " + sysKeyIntObj, "");
+ throw new ObjectNotFoundException("ClusterStorageManager.get() - Path " + path + " not found in " + itemPath, "");
}
/** Internal put method. Creates or overwrites a cluster in all writers. Used when committing transactions. */
- public void put(Integer sysKeyIntObj, C2KLocalObject obj) throws ClusterStorageException {
+ public void put(ItemPath itemPath, C2KLocalObject obj) throws ClusterStorageException {
String path = ClusterStorage.getPath(obj);
ArrayList<ClusterStorage> writers = findStorages(ClusterStorage.getClusterType(path), true);
for (ClusterStorage thisWriter : writers) {
try {
Logger.msg(7, "ClusterStorageManager.put() - writing "+path+" to "+thisWriter.getName());
- thisWriter.put(sysKeyIntObj, obj);
+ thisWriter.put(itemPath, obj);
} catch (ClusterStorageException e) {
Logger.error("ClusterStorageManager.put() - writer " + thisWriter.getName() + " could not store " +
- sysKeyIntObj + "/" + path + ": " + e.getMessage());
+ itemPath + "/" + path + ": " + e.getMessage());
throw e;
}
}
// put in mem cache if that worked
Map<String, C2KLocalObject> sysKeyMemCache;
- if (memoryCache.containsKey(sysKeyIntObj))
- sysKeyMemCache = memoryCache.get(sysKeyIntObj);
+ if (memoryCache.containsKey(itemPath))
+ sysKeyMemCache = memoryCache.get(itemPath);
else {
boolean useWeak = Gateway.getProperties().getBoolean("Storage.useWeakCache", false);
- Logger.msg(7,"ClusterStorageManager.put() - Creating "+(useWeak?"Weak":"Strong")+" cache for entity "+sysKeyIntObj);
+ Logger.msg(7,"ClusterStorageManager.put() - Creating "+(useWeak?"Weak":"Strong")+" cache for entity "+itemPath);
sysKeyMemCache = useWeak?new WeakCache<String, C2KLocalObject>():new SoftCache<String, C2KLocalObject>(0);
synchronized (memoryCache) {
- memoryCache.put(sysKeyIntObj, sysKeyMemCache);
+ memoryCache.put(itemPath, sysKeyMemCache);
}
}
@@ -306,45 +293,45 @@ public class ClusterStorageManager { if (Logger.doLog(9)) dumpCacheContents(9);
// transmit proxy event
- Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.ADDED));
+ Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(itemPath, path, ProxyMessage.ADDED));
}
/** Deletes a cluster from all writers */
- public void remove(Integer sysKeyIntObj, String path) throws ClusterStorageException {
+ public void remove(ItemPath itemPath, String path) throws ClusterStorageException {
ArrayList<ClusterStorage> writers = findStorages(ClusterStorage.getClusterType(path), true);
for (ClusterStorage thisWriter : writers) {
try {
Logger.msg(7, "ClusterStorageManager.delete() - removing "+path+" from "+thisWriter.getName());
- thisWriter.delete(sysKeyIntObj, path);
+ thisWriter.delete(itemPath, path);
} catch (ClusterStorageException e) {
- Logger.error("ClusterStorageManager.delete() - writer " + thisWriter.getName() + " could not delete " + sysKeyIntObj +
+ Logger.error("ClusterStorageManager.delete() - writer " + thisWriter.getName() + " could not delete " + itemPath +
"/" + path + ": " + e.getMessage());
throw e;
}
}
- if (memoryCache.containsKey(sysKeyIntObj)) {
- Map<String, C2KLocalObject> sysKeyMemCache = memoryCache.get(sysKeyIntObj);
- synchronized (sysKeyMemCache) {
- sysKeyMemCache.remove(path);
+ if (memoryCache.containsKey(itemPath)) {
+ Map<String, C2KLocalObject> itemMemCache = memoryCache.get(itemPath);
+ synchronized (itemMemCache) {
+ itemMemCache.remove(path);
}
}
// transmit proxy event
- Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.DELETED));
+ Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(itemPath, path, ProxyMessage.DELETED));
}
- public void clearCache(Integer sysKeyIntObj, String path) {
- Logger.msg(7, "CSM.clearCache() - removing "+sysKeyIntObj+"/"+path);
+ public void clearCache(ItemPath itemPath, String path) {
+ Logger.msg(7, "CSM.clearCache() - removing "+itemPath+"/"+path);
- if (memoryCache.containsKey(sysKeyIntObj)) {
- Map<String, C2KLocalObject> sysKeyMemCache = memoryCache.get(sysKeyIntObj);
+ if (memoryCache.containsKey(itemPath)) {
+ Map<String, C2KLocalObject> sysKeyMemCache = memoryCache.get(itemPath);
synchronized(sysKeyMemCache) {
for (Iterator<String> iter = sysKeyMemCache.keySet().iterator(); iter.hasNext();) {
String thisPath = iter.next();
if (thisPath.startsWith(path)) {
- Logger.msg(7, "CSM.clearCache() - removing "+sysKeyIntObj+"/"+thisPath);
+ Logger.msg(7, "CSM.clearCache() - removing "+itemPath+"/"+thisPath);
iter.remove();
}
}
@@ -352,18 +339,18 @@ public class ClusterStorageManager { }
}
- public void clearCache(Integer sysKeyIntObj) {
+ public void clearCache(ItemPath itemPath) {
- Logger.msg(5, "CSM.clearCache() - removing entire cache of "+sysKeyIntObj);
+ Logger.msg(5, "CSM.clearCache() - removing entire cache of "+itemPath);
- if (memoryCache.containsKey(sysKeyIntObj)) {
+ if (memoryCache.containsKey(itemPath)) {
synchronized (memoryCache) {
if (Logger.doLog(6)) {
- Map<String, C2KLocalObject> sysKeyMemCache = memoryCache.get(sysKeyIntObj);
+ Map<String, C2KLocalObject> sysKeyMemCache = memoryCache.get(itemPath);
int size = sysKeyMemCache.size();
Logger.msg(6, "CSM.clearCache() - "+size+" objects to remove.");
}
- memoryCache.remove(sysKeyIntObj);
+ memoryCache.remove(itemPath);
}
}
else
@@ -380,9 +367,9 @@ public class ClusterStorageManager { public void dumpCacheContents(int logLevel) {
if (!Logger.doLog(logLevel)) return;
synchronized(memoryCache) {
- for (Integer sysKey : memoryCache.keySet()) {
- Logger.msg(logLevel, "Cached Objects of Entity "+sysKey);
- Map<String, C2KLocalObject> sysKeyMemCache = memoryCache.get(sysKey);
+ for (ItemPath itemPath : memoryCache.keySet()) {
+ Logger.msg(logLevel, "Cached Objects of Entity "+itemPath);
+ Map<String, C2KLocalObject> sysKeyMemCache = memoryCache.get(itemPath);
try {
synchronized(sysKeyMemCache) {
for (Object name : sysKeyMemCache.keySet()) {
|
