From b086f57f56bf0eb9dab9cf321a0f69aaaae84347 Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Wed, 30 May 2012 08:37:45 +0200 Subject: Initial Maven Conversion --- .../persistency/ClusterStorageManager.java | 379 +++++++++++++++++++++ 1 file changed, 379 insertions(+) create mode 100644 src/main/java/com/c2kernel/persistency/ClusterStorageManager.java (limited to 'src/main/java/com/c2kernel/persistency/ClusterStorageManager.java') diff --git a/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java b/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java new file mode 100644 index 0000000..756ac4d --- /dev/null +++ b/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java @@ -0,0 +1,379 @@ +package com.c2kernel.persistency; + +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.StringTokenizer; + +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.entity.C2KLocalObject; +import com.c2kernel.entity.agent.JobList; +import com.c2kernel.entity.proxy.EntityProxyManager; +import com.c2kernel.entity.proxy.ProxyMessage; +import com.c2kernel.events.History; +import com.c2kernel.persistency.outcome.Outcome; +import com.c2kernel.persistency.outcome.Viewpoint; +import com.c2kernel.process.Gateway; +import com.c2kernel.utils.Logger; +import com.c2kernel.utils.SoftCache; + +/** + * instantiates ClusterStorages listed in properties file All read/write requests to storage pass through this object, which + * can query the capabilities of each declared storage, and channel requests accordingly. Transaction based. + * + * * @version $Revision: 1.62 $ $Date: 2006/02/01 13:27:46 $ + * @author $Author: abranson $ + */ +public class ClusterStorageManager { + HashMap allStores = new HashMap(); + String[] clusterPriority; + HashMap> clusterWriters = new HashMap>(); + HashMap> clusterReaders = new HashMap>(); + // we don't need a soft cache for the top level cache - the proxies and entities clear that when reaped + HashMap> memoryCache = new HashMap>(); + boolean ready = false; + + /** + * Initialises all ClusterStorage handlers listed by class name in the property "ClusterStorages" + * This property is usually process specific, and so should be in the server/client.conf and not the connect file. + */ + public ClusterStorageManager() throws ClusterStorageException { + String allClusters = Gateway.getProperty("ClusterStorage"); + if (allClusters == null || allClusters.equals("")) { + Logger.warning("ClusterStorageManager.init() - no ClusterStorages defined. No persistency!"); + return; + } + StringTokenizer tok = new StringTokenizer(allClusters, ","); + clusterPriority = new String[tok.countTokens()]; + int clusterNo = 0; + ArrayList rootStores = new ArrayList(); + while (tok.hasMoreTokens()) { + ClusterStorage newStorage = null; + String newStorageClass = tok.nextToken(); + try { + try { + newStorage = (ClusterStorage)(Class.forName(newStorageClass).newInstance()); + } catch (ClassNotFoundException ex2) { + newStorage = (ClusterStorage)(Class.forName("com.c2kernel.persistency."+newStorageClass).newInstance()); + } + newStorage.open(); + Logger.msg(5, "ClusterStorageManager.init() - Cluster storage " + newStorageClass + + " initialised successfully."); + allStores.put(newStorage.getId(), newStorage); + rootStores.add(newStorage); + clusterPriority[clusterNo++] = newStorage.getId(); + + } catch (ClusterStorageException ex) { + Logger.error(ex); + throw new ClusterStorageException("ClusterStorageManager.init() - Error initialising storage handler " + newStorageClass + + ": " + ex.getMessage()); + } catch (ClassNotFoundException ex) { + throw new ClusterStorageException("ClusterStorageManager.init() - The cluster storage handler class " + newStorageClass + + " could not be found."); + } catch (InstantiationException ex) { + throw new ClusterStorageException("ClusterStorageManager.init() - The cluster storage handler class " + newStorageClass + + " could not be instantiated."); + } catch (IllegalAccessException ex) { + throw new ClusterStorageException("ClusterStorageManager.init() - The cluster storage handler class " + newStorageClass + + " was not allowed to be instantiated."); + } + } + clusterReaders.put(ClusterStorage.ROOT, rootStores); // all storages are queried for clusters at the root level + ready = true; + } + + public void close() { + for (ClusterStorage thisStorage : allStores.values()) { + try { + thisStorage.close(); + } catch (ClusterStorageException ex) { + Logger.error(ex); + } + } + ready = false; + } + + /** + * Returns the loaded storage that declare that they can handle writing or reading the specified cluster name (e.g. + * Collection, Property) Must specify if the request is a read or a write. + */ + private ArrayList findStorages(String clusterType, boolean forWrite) { + + if (!ready) { + Logger.error("ClusterStorageManager.findStorages() - called before init!"); + return null; + } + + // choose the right cache for readers or writers + HashMap> cache; + if (forWrite) + cache = clusterWriters; + else + cache = clusterReaders; + + // check to see if we've been asked to do this before + if (cache.containsKey(clusterType)) + return cache.get(clusterType); + + // not done yet, we'll have to query them all + Logger.msg(7, "ClusterStorageManager.findStorages() - finding storage for "+clusterType+" forWrite:"+forWrite); + ArrayList useableStorages = new ArrayList(); + for (String element : clusterPriority) { + ClusterStorage thisStorage = allStores.get(element); + short requiredSupport = forWrite ? ClusterStorage.WRITE : ClusterStorage.READ; + if ((thisStorage.queryClusterSupport(clusterType) & requiredSupport) == requiredSupport) { + Logger.msg(7, "ClusterStorageManager.findStorages() - Got "+thisStorage.getName()); + useableStorages.add(thisStorage); + } + } + cache.put(clusterType, useableStorages); + return useableStorages; + } + + /** + * Retrieves the ids of the next level of a cluster + * Does not look in any currently open transactions. + */ + public String[] getClusterContents(Integer sysKey, String path) throws ClusterStorageException { + //String[] retArr = new String[0]; + ArrayList contents = new ArrayList(); + // get all readers + Logger.msg(8, "ClusterStorageManager.getClusterContents() - Finding contents of "+path); + ArrayList readers = findStorages(ClusterStorage.getClusterType(path), false); + // try each in turn until we get a result + for (ClusterStorage thisReader : readers) { + try { + String[] thisArr = thisReader.getClusterContents(sysKey, path); + if (thisArr != null) { + for (int j = 0; j < thisArr.length; j++) + if (!contents.contains(thisArr[j])) { + Logger.msg(9, "ClusterStorageManager.getClusterContents() - "+thisReader.getName()+" reports "+thisArr[j]); + contents.add(thisArr[j]); + } + } + } catch (ClusterStorageException e) { + Logger.error("ClusterStorageManager.getClusterContents() - reader " + thisReader.getName() + + " could not retrieve contents of " + sysKey + "/" + path + ": " + e.getMessage()); + } + } + + String[] retArr = new String[0]; + retArr = contents.toArray(retArr); + return retArr; + } + + /** Internal get method. Retrieves clusters from ClusterStorages & maintains the memory cache */ + public C2KLocalObject get(Integer sysKeyIntObj, String path) throws ClusterStorageException, ObjectNotFoundException { + C2KLocalObject result = null; + // check cache first + SoftCache sysKeyMemCache = null; + if (memoryCache.containsKey(sysKeyIntObj)) { + sysKeyMemCache = memoryCache.get(sysKeyIntObj); + synchronized(sysKeyMemCache) { + C2KLocalObject obj = sysKeyMemCache.get(path); + if (obj != null) { + Logger.msg(7, "ClusterStorageManager.get() - found "+sysKeyIntObj+"/"+path+" in memcache"); + return obj; + } + } + } + + // special case - loading viewpoint contents + if (path.startsWith(ClusterStorage.VIEWPOINT) && + path.endsWith("/data")) { + StringTokenizer tok = new StringTokenizer(path,"/"); + if (tok.countTokens() == 4) { // to not catch viewpoints called 'data' + Outcome data = null; + Viewpoint view = (Viewpoint)get(sysKeyIntObj, path.substring(0, path.lastIndexOf("/"))); + if (view != null) + data = view.getOutcome(); + return data; + } + } + + // deal out top level remote maps + if (path.indexOf('/') == -1) { + if (path.equals(ClusterStorage.HISTORY)) + result = new History(sysKeyIntObj, null); + if (path.equals(ClusterStorage.JOB)) + result = new JobList(sysKeyIntObj, null); + if (result!=null) { + synchronized(sysKeyMemCache) { + sysKeyMemCache.put(path, result); + } + return result; + } + + } + + // else try each reader in turn until we find it + ArrayList readers = findStorages(ClusterStorage.getClusterType(path), false); + for (ClusterStorage thisReader : readers) { + try { + result = thisReader.get(sysKeyIntObj, path); + Logger.msg(7, "ClusterStorageManager.get() - reading "+path+" from "+thisReader.getName() + " for intkey=" + sysKeyIntObj); + if (result != null) { // got it! + // store it in the cache + if (sysKeyMemCache == null) { // create cache if needed + sysKeyMemCache = new SoftCache(0); + synchronized (memoryCache) { + memoryCache.put(sysKeyIntObj, sysKeyMemCache); + } + } + synchronized(sysKeyMemCache) { + sysKeyMemCache.put(path, result); + } + // then return it + return result; + } + } catch (ClusterStorageException e) { + Logger.msg(7, "ClusterStorageManager.get() - reader " + thisReader.getName() + " could not retrieve " + sysKeyIntObj + + "/" + path + ": " + e.getMessage()); + } + } + throw new ObjectNotFoundException("ClusterStorageManager.get() - Path " + path + " not found in " + sysKeyIntObj, ""); + } + + /** Internal put method. Creates or overwrites a cluster in all writers. Used when committing transactions. */ + public void put(Integer sysKeyIntObj, C2KLocalObject obj) throws ClusterStorageException { + String path = ClusterStorage.getPath(obj); + ArrayList writers = findStorages(ClusterStorage.getClusterType(path), true); + for (ClusterStorage thisWriter : writers) { + try { + Logger.msg(7, "ClusterStorageManager.put() - writing "+path+" to "+thisWriter.getName()); + thisWriter.put(sysKeyIntObj, obj); + } catch (ClusterStorageException e) { + Logger.error("ClusterStorageManager.put() - writer " + thisWriter.getName() + " could not store " + + sysKeyIntObj + "/" + path + ": " + e.getMessage()); + throw e; + } + } + // put in mem cache if that worked + SoftCache sysKeyMemCache; + if (memoryCache.containsKey(sysKeyIntObj)) + sysKeyMemCache = memoryCache.get(sysKeyIntObj); + else { + sysKeyMemCache = new SoftCache(); + synchronized (memoryCache) { + memoryCache.put(sysKeyIntObj, sysKeyMemCache); + } + } + + synchronized(sysKeyMemCache) { + sysKeyMemCache.put(path, obj); + } + + if (Logger.doLog(9)) dumpCacheContents(9); + + // transmit proxy event + EntityProxyManager.sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.ADDED)); + } + + /** Deletes a cluster from all writers */ + public void remove(Integer sysKeyIntObj, String path) throws ClusterStorageException { + ArrayList writers = findStorages(ClusterStorage.getClusterType(path), true); + for (ClusterStorage thisWriter : writers) { + try { + Logger.msg(7, "ClusterStorageManager.delete() - removing "+path+" from "+thisWriter.getName()); + thisWriter.delete(sysKeyIntObj, path); + } catch (ClusterStorageException e) { + Logger.error("ClusterStorageManager.delete() - writer " + thisWriter.getName() + " could not delete " + sysKeyIntObj + + "/" + path + ": " + e.getMessage()); + throw e; + } + } + + if (memoryCache.containsKey(sysKeyIntObj)) { + SoftCache sysKeyMemCache = memoryCache.get(sysKeyIntObj); + synchronized (sysKeyMemCache) { + sysKeyMemCache.remove(path); + } + } + + + // transmit proxy event + EntityProxyManager.sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.DELETED)); + } + + public void clearCache(Integer sysKeyIntObj, String path) { + Logger.msg(7, "CSM.clearCache() - removing "+sysKeyIntObj+"/"+path); + + if (memoryCache.containsKey(sysKeyIntObj)) { + SoftCache sysKeyMemCache = memoryCache.get(sysKeyIntObj); + synchronized(sysKeyMemCache) { + for (Iterator iter = sysKeyMemCache.keySet().iterator(); iter.hasNext();) { + String thisPath = iter.next(); + if (thisPath.startsWith(path)) { + Logger.msg(7, "CSM.clearCache() - removing "+sysKeyIntObj+"/"+thisPath); + iter.remove(); + } + } + } + } + } + + public void clearCache(Integer sysKeyIntObj) { + + Logger.msg(5, "CSM.clearCache() - removing entire cache of "+sysKeyIntObj); + + if (memoryCache.containsKey(sysKeyIntObj)) { + synchronized (memoryCache) { + if (Logger.doLog(6)) { + SoftCache sysKeyMemCache = memoryCache.get(sysKeyIntObj); + int size = sysKeyMemCache.size(); + Logger.msg(6, "CSM.clearCache() - "+size+" objects to remove."); + } + memoryCache.remove(sysKeyIntObj); + } + } + else + Logger.msg(6, "CSM.clearCache() - No objects cached"); + } + + public void clearCache() { + synchronized (memoryCache) { + memoryCache.clear(); + } + Logger.msg(5, "CSM.clearCache() - cleared entire cache, "+memoryCache.size()+" entities."); + } + + public void dumpCacheContents(int logLevel) { + if (!Logger.doLog(logLevel)) return; + synchronized(memoryCache) { + for (Integer sysKey : memoryCache.keySet()) { + Logger.msg(logLevel, "Cached Objects of Entity "+sysKey); + SoftCache sysKeyMemCache = memoryCache.get(sysKey); + try { + synchronized(sysKeyMemCache) { + for (Object name : sysKeyMemCache.keySet()) { + String path = (String) name; + try { + Logger.msg(logLevel, " Path "+path+": "+sysKeyMemCache.get(path).getClass().getName()); + } catch (NullPointerException e) { + Logger.msg(logLevel, " Path "+path+": reaped"); + } + } + } + } catch (ConcurrentModificationException ex) { + Logger.msg(logLevel, "Cache modified - aborting"); + } + } + Logger.msg(logLevel, "Total number of cached entities: "+memoryCache.size()); + } + } + + public Object query(String id, Object query) throws ClusterStorageException { + ClusterStorage requiredStorage = allStores.get(id); + if (requiredStorage == null) + throw new ClusterStorageException("Storage "+id+" not found."); + return requiredStorage.query(query); + } + + public String queryToXML(String id, String query, boolean genericFormat) throws ClusterStorageException { + ClusterStorage requiredStorage = allStores.get(id); + if (requiredStorage == null) + throw new ClusterStorageException("Storage "+id+" not found."); + return requiredStorage.queryToXML(query, genericFormat); + } +} -- cgit v1.2.3