summaryrefslogtreecommitdiff
path: root/source/com/c2kernel/persistency/ClusterStorageManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'source/com/c2kernel/persistency/ClusterStorageManager.java')
-rwxr-xr-xsource/com/c2kernel/persistency/ClusterStorageManager.java369
1 files changed, 369 insertions, 0 deletions
diff --git a/source/com/c2kernel/persistency/ClusterStorageManager.java b/source/com/c2kernel/persistency/ClusterStorageManager.java
new file mode 100755
index 0000000..5309f33
--- /dev/null
+++ b/source/com/c2kernel/persistency/ClusterStorageManager.java
@@ -0,0 +1,369 @@
+package com.c2kernel.persistency;
+
+import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import com.c2kernel.common.ObjectNotFoundException;
+import com.c2kernel.entity.C2KLocalObject;
+import com.c2kernel.entity.proxy.EntityProxyManager;
+import com.c2kernel.entity.proxy.ProxyMessage;
+import com.c2kernel.persistency.outcome.Outcome;
+import com.c2kernel.persistency.outcome.Viewpoint;
+import com.c2kernel.process.Gateway;
+import com.c2kernel.utils.Logger;
+import com.c2kernel.utils.SoftCache;
+
+/**
+ * instantiates ClusterStorages listed in properties file All read/write requests to storage pass through this object, which
+ * can query the capabilities of each declared storage, and channel requests accordingly. Transaction based.
+ *
+ * * @version $Revision: 1.62 $ $Date: 2006/02/01 13:27:46 $
+ * @author $Author: abranson $
+ */
+public class ClusterStorageManager {
+ HashMap allStores = new HashMap();
+ String[] clusterPriority;
+ HashMap clusterWriters = new HashMap();
+ HashMap clusterReaders = new HashMap();
+ // we don't need a soft cache for the top level cache - the proxies and entities clear that when reaped
+ HashMap memoryCache = new HashMap();
+ boolean ready = false;
+
+ /**
+ * Initialises all ClusterStorage handlers listed by class name in the property "ClusterStorages"
+ * This property is usually process specific, and so should be in the server/client.conf and not the connect file.
+ */
+ public ClusterStorageManager() throws ClusterStorageException {
+ String allClusters = Gateway.getProperty("ClusterStorage");
+ if (allClusters == null || allClusters.equals("")) {
+ Logger.warning("ClusterStorageManager.init() - no ClusterStorages defined. No persistency!");
+ return;
+ }
+ StringTokenizer tok = new StringTokenizer(allClusters, ",");
+ clusterPriority = new String[tok.countTokens()];
+ int clusterNo = 0;
+ ArrayList rootStores = new ArrayList();
+ while (tok.hasMoreTokens()) {
+ ClusterStorage newStorage = null;
+ String newStorageClass = tok.nextToken();
+ try {
+ try {
+ newStorage = (ClusterStorage)(Class.forName(newStorageClass).newInstance());
+ } catch (ClassNotFoundException ex2) {
+ newStorage = (ClusterStorage)(Class.forName("com.c2kernel.persistency."+newStorageClass).newInstance());
+ }
+ newStorage.open();
+ Logger.msg(5, "ClusterStorageManager.init() - Cluster storage " + newStorageClass +
+ " initialised successfully.");
+ allStores.put(newStorage.getId(), newStorage);
+ rootStores.add(newStorage);
+ clusterPriority[clusterNo++] = newStorage.getId();
+
+ } catch (ClusterStorageException ex) {
+ Logger.error(ex);
+ throw new ClusterStorageException("ClusterStorageManager.init() - Error initialising storage handler " + newStorageClass +
+ ": " + ex.getMessage());
+ } catch (ClassNotFoundException ex) {
+ throw new ClusterStorageException("ClusterStorageManager.init() - The cluster storage handler class " + newStorageClass +
+ " could not be found.");
+ } catch (InstantiationException ex) {
+ throw new ClusterStorageException("ClusterStorageManager.init() - The cluster storage handler class " + newStorageClass +
+ " could not be instantiated.");
+ } catch (IllegalAccessException ex) {
+ throw new ClusterStorageException("ClusterStorageManager.init() - The cluster storage handler class " + newStorageClass +
+ " was not allowed to be instantiated.");
+ }
+ }
+ clusterReaders.put(ClusterStorage.ROOT, rootStores); // all storages are queried for clusters at the root level
+ ready = true;
+ }
+
+ public void close() {
+ for (Iterator iter = allStores.values().iterator(); iter.hasNext();) {
+ ClusterStorage thisStorage = (ClusterStorage)iter.next();
+ try {
+ thisStorage.close();
+ } catch (ClusterStorageException ex) {
+ Logger.error(ex);
+ }
+ }
+ ready = false;
+ }
+
+ /**
+ * Returns the loaded storage that declare that they can handle writing or reading the specified cluster name (e.g.
+ * Collection, Property) Must specify if the request is a read or a write.
+ */
+ private ArrayList findStorages(String clusterType, boolean forWrite) {
+
+ if (!ready) {
+ Logger.error("ClusterStorageManager.findStorages() - called before init!");
+ return null;
+ }
+
+ // choose the right cache for readers or writers
+ HashMap cache;
+ if (forWrite)
+ cache = clusterWriters;
+ else
+ cache = clusterReaders;
+
+ // check to see if we've been asked to do this before
+ if (cache.containsKey(clusterType))
+ return (ArrayList)cache.get(clusterType);
+
+ // not done yet, we'll have to query them all
+ Logger.msg(7, "ClusterStorageManager.findStorages() - finding storage for "+clusterType+" forWrite:"+forWrite);
+ ArrayList useableStorages = new ArrayList();
+ for (int i = 0; i < clusterPriority.length; i++) {
+ ClusterStorage thisStorage = (ClusterStorage)allStores.get(clusterPriority[i]);
+ short requiredSupport = forWrite ? ClusterStorage.WRITE : ClusterStorage.READ;
+ if ((thisStorage.queryClusterSupport(clusterType) & requiredSupport) == requiredSupport) {
+ Logger.msg(7, "ClusterStorageManager.findStorages() - Got "+thisStorage.getName());
+ useableStorages.add(thisStorage);
+ }
+ }
+ cache.put(clusterType, useableStorages);
+ return useableStorages;
+ }
+
+ /**
+ * Retrieves the ids of the next level of a cluster
+ * Does not look in any currently open transactions.
+ */
+ public String[] getClusterContents(Integer sysKey, String path) throws ClusterStorageException {
+ //String[] retArr = new String[0];
+ ArrayList contents = new ArrayList();
+ // get all readers
+ String type = ClusterStorage.getClusterType(path);
+ Logger.msg(8, "ClusterStorageManager.getClusterContents() - Finding contents of "+path);
+ ArrayList readers = findStorages(ClusterStorage.getClusterType(path), false);
+ // try each in turn until we get a result
+ for (Iterator i = readers.iterator(); i.hasNext();) {
+ ClusterStorage thisReader = (ClusterStorage)i.next();
+ try {
+ String[] thisArr = thisReader.getClusterContents(sysKey, path);
+ if (thisArr != null) {
+ for (int j = 0; j < thisArr.length; j++)
+ if (!contents.contains(thisArr[j])) {
+ Logger.msg(9, "ClusterStorageManager.getClusterContents() - "+thisReader.getName()+" reports "+thisArr[j]);
+ contents.add(thisArr[j]);
+ }
+ }
+ } catch (ClusterStorageException e) {
+ Logger.error("ClusterStorageManager.getClusterContents() - reader " + thisReader.getName() +
+ " could not retrieve contents of " + sysKey + "/" + path + ": " + e.getMessage());
+ }
+ }
+
+ String[] retArr = new String[0];
+ retArr = (String[])contents.toArray(retArr);
+ return retArr;
+ }
+
+ /** Internal get method. Retrieves clusters from ClusterStorages & maintains the memory cache */
+ public C2KLocalObject get(Integer sysKeyIntObj, String path) throws ClusterStorageException, ObjectNotFoundException {
+ C2KLocalObject result;
+ // check cache first
+ SoftCache sysKeyMemCache = null;
+ if (memoryCache.containsKey(sysKeyIntObj)) {
+ sysKeyMemCache = (SoftCache)memoryCache.get(sysKeyIntObj);
+ synchronized(sysKeyMemCache) {
+ C2KLocalObject obj = (C2KLocalObject)sysKeyMemCache.get(path);
+ if (obj != null) {
+ Logger.msg(7, "ClusterStorageManager.get() - found "+sysKeyIntObj+"/"+path+" in memcache");
+ return obj;
+ }
+ }
+ }
+
+ // special case - loading viewpoint contents
+ if (path.startsWith(ClusterStorage.VIEWPOINT) &&
+ path.endsWith("/data")) {
+ StringTokenizer tok = new StringTokenizer(path,"/");
+ if (tok.countTokens() == 4) { // to not catch viewpoints called 'data'
+ Outcome data = null;
+ Viewpoint view = (Viewpoint)get(sysKeyIntObj, path.substring(0, path.lastIndexOf("/")));
+ if (view != null)
+ data = view.getOutcome();
+ return data;
+ }
+ }
+
+ // else try each reader in turn until we find it
+ ArrayList readers = findStorages(ClusterStorage.getClusterType(path), false);
+ for (Iterator i = readers.iterator(); i.hasNext(); ) {
+ ClusterStorage thisReader = (ClusterStorage)i.next();
+ try {
+ result = thisReader.get(sysKeyIntObj, path);
+ Logger.msg(7, "ClusterStorageManager.get() - reading "+path+" from "+thisReader.getName() + " for intkey=" + sysKeyIntObj);
+ if (result != null) { // got it!
+ // store it in the cache
+ if (sysKeyMemCache == null) { // create cache if needed
+ sysKeyMemCache = new SoftCache(0);
+ synchronized (memoryCache) {
+ memoryCache.put(sysKeyIntObj, sysKeyMemCache);
+ }
+ }
+ synchronized(sysKeyMemCache) {
+ sysKeyMemCache.put(path, result);
+ }
+ // then return it
+ return result;
+ }
+ } catch (ClusterStorageException e) {
+ Logger.msg(7, "ClusterStorageManager.get() - reader " + thisReader.getName() + " could not retrieve " + sysKeyIntObj +
+ "/" + path + ": " + e.getMessage());
+ }
+ }
+ throw new ObjectNotFoundException("ClusterStorageManager.get() - Path " + path + " not found in " + sysKeyIntObj, "");
+ }
+
+ /** Internal put method. Creates or overwrites a cluster in all writers. Used when committing transactions. */
+ public void put(Integer sysKeyIntObj, C2KLocalObject obj) throws ClusterStorageException {
+ String path = ClusterStorage.getPath(obj);
+ ArrayList writers = findStorages(ClusterStorage.getClusterType(path), true);
+ for (Iterator i = writers.iterator(); i.hasNext(); ) {
+ ClusterStorage thisWriter = (ClusterStorage)i.next();
+ try {
+ Logger.msg(7, "ClusterStorageManager.put() - writing "+path+" to "+thisWriter.getName());
+ thisWriter.put(sysKeyIntObj, obj);
+ } catch (ClusterStorageException e) {
+ Logger.error("ClusterStorageManager.put() - writer " + thisWriter.getName() + " could not store " +
+ sysKeyIntObj + "/" + path + ": " + e.getMessage());
+ throw e;
+ }
+ }
+ // put in mem cache if that worked
+ SoftCache sysKeyMemCache;
+ if (memoryCache.containsKey(sysKeyIntObj))
+ sysKeyMemCache = (SoftCache)memoryCache.get(sysKeyIntObj);
+ else {
+ sysKeyMemCache = new SoftCache();
+ synchronized (memoryCache) {
+ memoryCache.put(sysKeyIntObj, sysKeyMemCache);
+ }
+ }
+
+ synchronized(sysKeyMemCache) {
+ sysKeyMemCache.put(path, obj);
+ }
+
+ if (Logger.doLog(9)) dumpCacheContents(9);
+
+ // transmit proxy event
+ EntityProxyManager.sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.ADDED));
+ }
+
+ /** Deletes a cluster from all writers */
+ public void remove(Integer sysKeyIntObj, String path) throws ClusterStorageException {
+ ArrayList writers = findStorages(ClusterStorage.getClusterType(path), true);
+ for (Iterator i = writers.iterator(); i.hasNext(); ) {
+ ClusterStorage thisWriter = (ClusterStorage)i.next();
+ try {
+ Logger.msg(7, "ClusterStorageManager.delete() - removing "+path+" from "+thisWriter.getName());
+ thisWriter.delete(sysKeyIntObj, path);
+ } catch (ClusterStorageException e) {
+ Logger.error("ClusterStorageManager.delete() - writer " + thisWriter.getName() + " could not delete " + sysKeyIntObj +
+ "/" + path + ": " + e.getMessage());
+ throw e;
+ }
+ }
+
+ if (memoryCache.containsKey(sysKeyIntObj)) {
+ SoftCache sysKeyMemCache = (SoftCache)memoryCache.get(sysKeyIntObj);
+ synchronized (sysKeyMemCache) {
+ sysKeyMemCache.remove(path);
+ }
+ }
+
+
+ // transmit proxy event
+ EntityProxyManager.sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.DELETED));
+ }
+
+ public void clearCache(Integer sysKeyIntObj, String path) {
+ Logger.msg(7, "CSM.clearCache() - removing "+sysKeyIntObj+"/"+path);
+
+ if (memoryCache.containsKey(sysKeyIntObj)) {
+ SoftCache sysKeyMemCache = (SoftCache)memoryCache.get(sysKeyIntObj);
+ synchronized(sysKeyMemCache) {
+ for (Iterator iter = sysKeyMemCache.keySet().iterator(); iter.hasNext();) {
+ String thisPath = (String)iter.next();
+ if (thisPath.startsWith(path)) {
+ Logger.msg(7, "CSM.clearCache() - removing "+sysKeyIntObj+"/"+thisPath);
+ iter.remove();
+ }
+ }
+ }
+ }
+ }
+
+ public void clearCache(Integer sysKeyIntObj) {
+
+ Logger.msg(5, "CSM.clearCache() - removing entire cache of "+sysKeyIntObj);
+
+ if (memoryCache.containsKey(sysKeyIntObj)) {
+ synchronized (memoryCache) {
+ if (Logger.doLog(6)) {
+ SoftCache sysKeyMemCache = (SoftCache)memoryCache.get(sysKeyIntObj);
+ int size = sysKeyMemCache.size();
+ Logger.msg(6, "CSM.clearCache() - "+size+" objects to remove.");
+ }
+ memoryCache.remove(sysKeyIntObj);
+ }
+ }
+ else
+ Logger.msg(6, "CSM.clearCache() - No objects cached");
+ }
+
+ public void clearCache() {
+ synchronized (memoryCache) {
+ memoryCache.clear();
+ }
+ Logger.msg(5, "CSM.clearCache() - cleared entire cache, "+memoryCache.size()+" entities.");
+ }
+
+ public void dumpCacheContents(int logLevel) {
+ if (!Logger.doLog(logLevel)) return;
+ synchronized(memoryCache) {
+ for (Iterator iter = memoryCache.keySet().iterator(); iter.hasNext();) {
+ Integer sysKey = (Integer) iter.next();
+ Logger.msg(logLevel, "Cached Objects of Entity "+sysKey);
+ SoftCache sysKeyMemCache = (SoftCache)memoryCache.get(sysKey);
+ try {
+ synchronized(sysKeyMemCache) {
+ for (Iterator iterator = sysKeyMemCache.keySet().iterator();iterator.hasNext();) {
+ String path = (String) iterator.next();
+ try {
+ Logger.msg(logLevel, " Path "+path+": "+sysKeyMemCache.get(path).getClass().getName());
+ } catch (NullPointerException e) {
+ Logger.msg(logLevel, " Path "+path+": reaped");
+ }
+ }
+ }
+ } catch (ConcurrentModificationException ex) {
+ Logger.msg(logLevel, "Cache modified - aborting");
+ }
+ }
+ Logger.msg(logLevel, "Total number of cached entities: "+memoryCache.size());
+ }
+ }
+
+ public Object query(String id, Object query) throws ClusterStorageException {
+ ClusterStorage requiredStorage = (ClusterStorage)allStores.get(id);
+ if (requiredStorage == null)
+ throw new ClusterStorageException("Storage "+id+" not found.");
+ return requiredStorage.query(query);
+ }
+
+ public String queryToXML(String id, String query, boolean genericFormat) throws ClusterStorageException {
+ ClusterStorage requiredStorage = (ClusterStorage)allStores.get(id);
+ if (requiredStorage == null)
+ throw new ClusterStorageException("Storage "+id+" not found.");
+ return requiredStorage.queryToXML(query, genericFormat);
+ }
+}