From b086f57f56bf0eb9dab9cf321a0f69aaaae84347 Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Wed, 30 May 2012 08:37:45 +0200 Subject: Initial Maven Conversion --- .../com/c2kernel/persistency/ClusterStorage.java | 104 ++++++ .../persistency/ClusterStorageException.java | 17 + .../persistency/ClusterStorageManager.java | 379 +++++++++++++++++++++ .../com/c2kernel/persistency/LDAPClientReader.java | 43 +++ .../c2kernel/persistency/LDAPClusterStorage.java | 172 ++++++++++ .../java/com/c2kernel/persistency/ProxyLoader.java | 133 ++++++++ .../java/com/c2kernel/persistency/RemoteMap.java | 374 ++++++++++++++++++++ .../c2kernel/persistency/TransactionManager.java | 324 ++++++++++++++++++ .../c2kernel/persistency/XMLClusterStorage.java | 154 +++++++++ .../com/c2kernel/persistency/outcome/Outcome.java | 177 ++++++++++ .../persistency/outcome/OutcomeValidator.java | 188 ++++++++++ .../com/c2kernel/persistency/outcome/Schema.java | 18 + .../persistency/outcome/SchemaValidator.java | 55 +++ .../c2kernel/persistency/outcome/Viewpoint.java | 180 ++++++++++ 14 files changed, 2318 insertions(+) create mode 100644 src/main/java/com/c2kernel/persistency/ClusterStorage.java create mode 100644 src/main/java/com/c2kernel/persistency/ClusterStorageException.java create mode 100644 src/main/java/com/c2kernel/persistency/ClusterStorageManager.java create mode 100644 src/main/java/com/c2kernel/persistency/LDAPClientReader.java create mode 100644 src/main/java/com/c2kernel/persistency/LDAPClusterStorage.java create mode 100644 src/main/java/com/c2kernel/persistency/ProxyLoader.java create mode 100644 src/main/java/com/c2kernel/persistency/RemoteMap.java create mode 100644 src/main/java/com/c2kernel/persistency/TransactionManager.java create mode 100644 src/main/java/com/c2kernel/persistency/XMLClusterStorage.java create mode 100644 src/main/java/com/c2kernel/persistency/outcome/Outcome.java create mode 100644 src/main/java/com/c2kernel/persistency/outcome/OutcomeValidator.java create mode 100644 src/main/java/com/c2kernel/persistency/outcome/Schema.java create mode 100644 src/main/java/com/c2kernel/persistency/outcome/SchemaValidator.java create mode 100644 src/main/java/com/c2kernel/persistency/outcome/Viewpoint.java (limited to 'src/main/java/com/c2kernel/persistency') diff --git a/src/main/java/com/c2kernel/persistency/ClusterStorage.java b/src/main/java/com/c2kernel/persistency/ClusterStorage.java new file mode 100644 index 0000000..80fd86d --- /dev/null +++ b/src/main/java/com/c2kernel/persistency/ClusterStorage.java @@ -0,0 +1,104 @@ + +package com.c2kernel.persistency; +import com.c2kernel.entity.C2KLocalObject; +import com.c2kernel.persistency.outcome.Outcome; +import com.c2kernel.persistency.outcome.Viewpoint; +import com.c2kernel.utils.Logger; + +/** Interface for persistency managers of entities. + A Cluster is defined as a path under the item + Each ClusterStorage must support get() and getClusterContents() for clusters they return READ and READWRITE from queryClusterSupport + and put() and delete() for clusters they return WRITE and READWRITE from queryClusterSupport(). + Unsupported operations should throw a ClusterStorageException. + If a cluster does not exist, get should return null, and delete should return + @version $Revision: 1.22 $ $Date: 2006/02/01 13:27:47 $ + @author $Author: abranson $ +*/ +public abstract class ClusterStorage { + + public static final short NONE = 0; + public static final short READ = 1; + public static final short WRITE = 2; + public static final short READWRITE = 3; + + // Cluster types + public static final String ROOT = ""; + public static final String PROPERTY = "Property"; + public static final String COLLECTION = "Collection"; + public static final String LIFECYCLE = "LifeCycle"; + public static final String OUTCOME = "Outcome"; + public static final String HISTORY = "AuditTrail"; + public static final String VIEWPOINT = "ViewPoint"; + public static final String JOB = "Job"; + + // connection maintenance + public abstract void open() + throws ClusterStorageException; + public abstract void close() + throws ClusterStorageException; + + // introspection + public abstract short queryClusterSupport(String clusterType); + public abstract String getName(); + // for addressing queries + public abstract String getId(); + + + /** Quickly gets the first string of the slashed path */ + public static String getClusterType(String path) { + try { + if (path == null || path.length() == 0) return ClusterStorage.ROOT; + int start = path.charAt(0) == '/' ? 1 : 0; + int end = path.indexOf('/', start + 1); + if (end == -1) end = path.length(); + return path.substring(start, end); + } catch (Exception ex) { + Logger.error(ex); + return ClusterStorage.ROOT; + } + } + + public static String getPath(C2KLocalObject obj) { + String root = obj.getClusterType(); + if (root == null) return null; // no storage allowed + if (obj instanceof Outcome) { + Outcome oc = (Outcome)obj; + return root+"/"+oc.getSchemaType()+"/"+oc.getSchemaVersion()+"/"+oc.getName(); + } + else if (obj instanceof Viewpoint) { + Viewpoint vp = (Viewpoint)obj; + return root+"/"+vp.getSchemaName()+"/"+vp.getName(); + } + else + return root+"/"+obj.getName(); + } + + /* object manipulation */ + + // retrieve object by path + public abstract C2KLocalObject get(Integer sysKey, String path) + throws ClusterStorageException; + // store object by path + public abstract void put(Integer sysKey, C2KLocalObject obj) + throws ClusterStorageException; + // delete cluster + public abstract void delete(Integer sysKey, String path) + throws ClusterStorageException; + + // db specific queries + public Object query(Object query) + throws ClusterStorageException { + throw new ClusterStorageException("Query not supported on this storage"); + } + + public String queryToXML(String query, boolean genericFormat) + throws ClusterStorageException { + throw new ClusterStorageException("Query not supported on this storage"); + } + + + // directory listing + public abstract String[] getClusterContents(Integer sysKey, String path) + throws ClusterStorageException; + +} diff --git a/src/main/java/com/c2kernel/persistency/ClusterStorageException.java b/src/main/java/com/c2kernel/persistency/ClusterStorageException.java new file mode 100644 index 0000000..b51982c --- /dev/null +++ b/src/main/java/com/c2kernel/persistency/ClusterStorageException.java @@ -0,0 +1,17 @@ +package com.c2kernel.persistency; + +/** + * + * @version $Revision: 1.2 $ $Date: 2003/07/14 07:57:06 $ + * @author $Author: abranson $ + */ + +public class ClusterStorageException extends Exception { + public ClusterStorageException() { + super(); + } + public ClusterStorageException(String s) { + super(s); + } + +} diff --git a/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java b/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java new file mode 100644 index 0000000..756ac4d --- /dev/null +++ b/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java @@ -0,0 +1,379 @@ +package com.c2kernel.persistency; + +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.StringTokenizer; + +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.entity.C2KLocalObject; +import com.c2kernel.entity.agent.JobList; +import com.c2kernel.entity.proxy.EntityProxyManager; +import com.c2kernel.entity.proxy.ProxyMessage; +import com.c2kernel.events.History; +import com.c2kernel.persistency.outcome.Outcome; +import com.c2kernel.persistency.outcome.Viewpoint; +import com.c2kernel.process.Gateway; +import com.c2kernel.utils.Logger; +import com.c2kernel.utils.SoftCache; + +/** + * instantiates ClusterStorages listed in properties file All read/write requests to storage pass through this object, which + * can query the capabilities of each declared storage, and channel requests accordingly. Transaction based. + * + * * @version $Revision: 1.62 $ $Date: 2006/02/01 13:27:46 $ + * @author $Author: abranson $ + */ +public class ClusterStorageManager { + HashMap allStores = new HashMap(); + String[] clusterPriority; + HashMap> clusterWriters = new HashMap>(); + HashMap> clusterReaders = new HashMap>(); + // we don't need a soft cache for the top level cache - the proxies and entities clear that when reaped + HashMap> memoryCache = new HashMap>(); + boolean ready = false; + + /** + * Initialises all ClusterStorage handlers listed by class name in the property "ClusterStorages" + * This property is usually process specific, and so should be in the server/client.conf and not the connect file. + */ + public ClusterStorageManager() throws ClusterStorageException { + String allClusters = Gateway.getProperty("ClusterStorage"); + if (allClusters == null || allClusters.equals("")) { + Logger.warning("ClusterStorageManager.init() - no ClusterStorages defined. No persistency!"); + return; + } + StringTokenizer tok = new StringTokenizer(allClusters, ","); + clusterPriority = new String[tok.countTokens()]; + int clusterNo = 0; + ArrayList rootStores = new ArrayList(); + while (tok.hasMoreTokens()) { + ClusterStorage newStorage = null; + String newStorageClass = tok.nextToken(); + try { + try { + newStorage = (ClusterStorage)(Class.forName(newStorageClass).newInstance()); + } catch (ClassNotFoundException ex2) { + newStorage = (ClusterStorage)(Class.forName("com.c2kernel.persistency."+newStorageClass).newInstance()); + } + newStorage.open(); + Logger.msg(5, "ClusterStorageManager.init() - Cluster storage " + newStorageClass + + " initialised successfully."); + allStores.put(newStorage.getId(), newStorage); + rootStores.add(newStorage); + clusterPriority[clusterNo++] = newStorage.getId(); + + } catch (ClusterStorageException ex) { + Logger.error(ex); + throw new ClusterStorageException("ClusterStorageManager.init() - Error initialising storage handler " + newStorageClass + + ": " + ex.getMessage()); + } catch (ClassNotFoundException ex) { + throw new ClusterStorageException("ClusterStorageManager.init() - The cluster storage handler class " + newStorageClass + + " could not be found."); + } catch (InstantiationException ex) { + throw new ClusterStorageException("ClusterStorageManager.init() - The cluster storage handler class " + newStorageClass + + " could not be instantiated."); + } catch (IllegalAccessException ex) { + throw new ClusterStorageException("ClusterStorageManager.init() - The cluster storage handler class " + newStorageClass + + " was not allowed to be instantiated."); + } + } + clusterReaders.put(ClusterStorage.ROOT, rootStores); // all storages are queried for clusters at the root level + ready = true; + } + + public void close() { + for (ClusterStorage thisStorage : allStores.values()) { + try { + thisStorage.close(); + } catch (ClusterStorageException ex) { + Logger.error(ex); + } + } + ready = false; + } + + /** + * Returns the loaded storage that declare that they can handle writing or reading the specified cluster name (e.g. + * Collection, Property) Must specify if the request is a read or a write. + */ + private ArrayList findStorages(String clusterType, boolean forWrite) { + + if (!ready) { + Logger.error("ClusterStorageManager.findStorages() - called before init!"); + return null; + } + + // choose the right cache for readers or writers + HashMap> cache; + if (forWrite) + cache = clusterWriters; + else + cache = clusterReaders; + + // check to see if we've been asked to do this before + if (cache.containsKey(clusterType)) + return cache.get(clusterType); + + // not done yet, we'll have to query them all + Logger.msg(7, "ClusterStorageManager.findStorages() - finding storage for "+clusterType+" forWrite:"+forWrite); + ArrayList useableStorages = new ArrayList(); + for (String element : clusterPriority) { + ClusterStorage thisStorage = allStores.get(element); + short requiredSupport = forWrite ? ClusterStorage.WRITE : ClusterStorage.READ; + if ((thisStorage.queryClusterSupport(clusterType) & requiredSupport) == requiredSupport) { + Logger.msg(7, "ClusterStorageManager.findStorages() - Got "+thisStorage.getName()); + useableStorages.add(thisStorage); + } + } + cache.put(clusterType, useableStorages); + return useableStorages; + } + + /** + * Retrieves the ids of the next level of a cluster + * Does not look in any currently open transactions. + */ + public String[] getClusterContents(Integer sysKey, String path) throws ClusterStorageException { + //String[] retArr = new String[0]; + ArrayList contents = new ArrayList(); + // get all readers + Logger.msg(8, "ClusterStorageManager.getClusterContents() - Finding contents of "+path); + ArrayList readers = findStorages(ClusterStorage.getClusterType(path), false); + // try each in turn until we get a result + for (ClusterStorage thisReader : readers) { + try { + String[] thisArr = thisReader.getClusterContents(sysKey, path); + if (thisArr != null) { + for (int j = 0; j < thisArr.length; j++) + if (!contents.contains(thisArr[j])) { + Logger.msg(9, "ClusterStorageManager.getClusterContents() - "+thisReader.getName()+" reports "+thisArr[j]); + contents.add(thisArr[j]); + } + } + } catch (ClusterStorageException e) { + Logger.error("ClusterStorageManager.getClusterContents() - reader " + thisReader.getName() + + " could not retrieve contents of " + sysKey + "/" + path + ": " + e.getMessage()); + } + } + + String[] retArr = new String[0]; + retArr = contents.toArray(retArr); + return retArr; + } + + /** Internal get method. Retrieves clusters from ClusterStorages & maintains the memory cache */ + public C2KLocalObject get(Integer sysKeyIntObj, String path) throws ClusterStorageException, ObjectNotFoundException { + C2KLocalObject result = null; + // check cache first + SoftCache sysKeyMemCache = null; + if (memoryCache.containsKey(sysKeyIntObj)) { + sysKeyMemCache = memoryCache.get(sysKeyIntObj); + synchronized(sysKeyMemCache) { + C2KLocalObject obj = sysKeyMemCache.get(path); + if (obj != null) { + Logger.msg(7, "ClusterStorageManager.get() - found "+sysKeyIntObj+"/"+path+" in memcache"); + return obj; + } + } + } + + // special case - loading viewpoint contents + if (path.startsWith(ClusterStorage.VIEWPOINT) && + path.endsWith("/data")) { + StringTokenizer tok = new StringTokenizer(path,"/"); + if (tok.countTokens() == 4) { // to not catch viewpoints called 'data' + Outcome data = null; + Viewpoint view = (Viewpoint)get(sysKeyIntObj, path.substring(0, path.lastIndexOf("/"))); + if (view != null) + data = view.getOutcome(); + return data; + } + } + + // deal out top level remote maps + if (path.indexOf('/') == -1) { + if (path.equals(ClusterStorage.HISTORY)) + result = new History(sysKeyIntObj, null); + if (path.equals(ClusterStorage.JOB)) + result = new JobList(sysKeyIntObj, null); + if (result!=null) { + synchronized(sysKeyMemCache) { + sysKeyMemCache.put(path, result); + } + return result; + } + + } + + // else try each reader in turn until we find it + ArrayList readers = findStorages(ClusterStorage.getClusterType(path), false); + for (ClusterStorage thisReader : readers) { + try { + result = thisReader.get(sysKeyIntObj, path); + Logger.msg(7, "ClusterStorageManager.get() - reading "+path+" from "+thisReader.getName() + " for intkey=" + sysKeyIntObj); + if (result != null) { // got it! + // store it in the cache + if (sysKeyMemCache == null) { // create cache if needed + sysKeyMemCache = new SoftCache(0); + synchronized (memoryCache) { + memoryCache.put(sysKeyIntObj, sysKeyMemCache); + } + } + synchronized(sysKeyMemCache) { + sysKeyMemCache.put(path, result); + } + // then return it + return result; + } + } catch (ClusterStorageException e) { + Logger.msg(7, "ClusterStorageManager.get() - reader " + thisReader.getName() + " could not retrieve " + sysKeyIntObj + + "/" + path + ": " + e.getMessage()); + } + } + throw new ObjectNotFoundException("ClusterStorageManager.get() - Path " + path + " not found in " + sysKeyIntObj, ""); + } + + /** Internal put method. Creates or overwrites a cluster in all writers. Used when committing transactions. */ + public void put(Integer sysKeyIntObj, C2KLocalObject obj) throws ClusterStorageException { + String path = ClusterStorage.getPath(obj); + ArrayList writers = findStorages(ClusterStorage.getClusterType(path), true); + for (ClusterStorage thisWriter : writers) { + try { + Logger.msg(7, "ClusterStorageManager.put() - writing "+path+" to "+thisWriter.getName()); + thisWriter.put(sysKeyIntObj, obj); + } catch (ClusterStorageException e) { + Logger.error("ClusterStorageManager.put() - writer " + thisWriter.getName() + " could not store " + + sysKeyIntObj + "/" + path + ": " + e.getMessage()); + throw e; + } + } + // put in mem cache if that worked + SoftCache sysKeyMemCache; + if (memoryCache.containsKey(sysKeyIntObj)) + sysKeyMemCache = memoryCache.get(sysKeyIntObj); + else { + sysKeyMemCache = new SoftCache(); + synchronized (memoryCache) { + memoryCache.put(sysKeyIntObj, sysKeyMemCache); + } + } + + synchronized(sysKeyMemCache) { + sysKeyMemCache.put(path, obj); + } + + if (Logger.doLog(9)) dumpCacheContents(9); + + // transmit proxy event + EntityProxyManager.sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.ADDED)); + } + + /** Deletes a cluster from all writers */ + public void remove(Integer sysKeyIntObj, String path) throws ClusterStorageException { + ArrayList writers = findStorages(ClusterStorage.getClusterType(path), true); + for (ClusterStorage thisWriter : writers) { + try { + Logger.msg(7, "ClusterStorageManager.delete() - removing "+path+" from "+thisWriter.getName()); + thisWriter.delete(sysKeyIntObj, path); + } catch (ClusterStorageException e) { + Logger.error("ClusterStorageManager.delete() - writer " + thisWriter.getName() + " could not delete " + sysKeyIntObj + + "/" + path + ": " + e.getMessage()); + throw e; + } + } + + if (memoryCache.containsKey(sysKeyIntObj)) { + SoftCache sysKeyMemCache = memoryCache.get(sysKeyIntObj); + synchronized (sysKeyMemCache) { + sysKeyMemCache.remove(path); + } + } + + + // transmit proxy event + EntityProxyManager.sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.DELETED)); + } + + public void clearCache(Integer sysKeyIntObj, String path) { + Logger.msg(7, "CSM.clearCache() - removing "+sysKeyIntObj+"/"+path); + + if (memoryCache.containsKey(sysKeyIntObj)) { + SoftCache sysKeyMemCache = memoryCache.get(sysKeyIntObj); + synchronized(sysKeyMemCache) { + for (Iterator iter = sysKeyMemCache.keySet().iterator(); iter.hasNext();) { + String thisPath = iter.next(); + if (thisPath.startsWith(path)) { + Logger.msg(7, "CSM.clearCache() - removing "+sysKeyIntObj+"/"+thisPath); + iter.remove(); + } + } + } + } + } + + public void clearCache(Integer sysKeyIntObj) { + + Logger.msg(5, "CSM.clearCache() - removing entire cache of "+sysKeyIntObj); + + if (memoryCache.containsKey(sysKeyIntObj)) { + synchronized (memoryCache) { + if (Logger.doLog(6)) { + SoftCache sysKeyMemCache = memoryCache.get(sysKeyIntObj); + int size = sysKeyMemCache.size(); + Logger.msg(6, "CSM.clearCache() - "+size+" objects to remove."); + } + memoryCache.remove(sysKeyIntObj); + } + } + else + Logger.msg(6, "CSM.clearCache() - No objects cached"); + } + + public void clearCache() { + synchronized (memoryCache) { + memoryCache.clear(); + } + Logger.msg(5, "CSM.clearCache() - cleared entire cache, "+memoryCache.size()+" entities."); + } + + public void dumpCacheContents(int logLevel) { + if (!Logger.doLog(logLevel)) return; + synchronized(memoryCache) { + for (Integer sysKey : memoryCache.keySet()) { + Logger.msg(logLevel, "Cached Objects of Entity "+sysKey); + SoftCache sysKeyMemCache = memoryCache.get(sysKey); + try { + synchronized(sysKeyMemCache) { + for (Object name : sysKeyMemCache.keySet()) { + String path = (String) name; + try { + Logger.msg(logLevel, " Path "+path+": "+sysKeyMemCache.get(path).getClass().getName()); + } catch (NullPointerException e) { + Logger.msg(logLevel, " Path "+path+": reaped"); + } + } + } + } catch (ConcurrentModificationException ex) { + Logger.msg(logLevel, "Cache modified - aborting"); + } + } + Logger.msg(logLevel, "Total number of cached entities: "+memoryCache.size()); + } + } + + public Object query(String id, Object query) throws ClusterStorageException { + ClusterStorage requiredStorage = allStores.get(id); + if (requiredStorage == null) + throw new ClusterStorageException("Storage "+id+" not found."); + return requiredStorage.query(query); + } + + public String queryToXML(String id, String query, boolean genericFormat) throws ClusterStorageException { + ClusterStorage requiredStorage = allStores.get(id); + if (requiredStorage == null) + throw new ClusterStorageException("Storage "+id+" not found."); + return requiredStorage.queryToXML(query, genericFormat); + } +} diff --git a/src/main/java/com/c2kernel/persistency/LDAPClientReader.java b/src/main/java/com/c2kernel/persistency/LDAPClientReader.java new file mode 100644 index 0000000..ac9215c --- /dev/null +++ b/src/main/java/com/c2kernel/persistency/LDAPClientReader.java @@ -0,0 +1,43 @@ +package com.c2kernel.persistency; + +import com.c2kernel.entity.C2KLocalObject; + +/** Allows clients to directly load properties and collections from the LDAP +* so no CORBA calls need to be made during normal browsing +*/ + +public class LDAPClientReader extends LDAPClusterStorage { + // return all readwrite support as readonly + @Override + public short queryClusterSupport(String clusterType) { + return (short)(super.queryClusterSupport(clusterType) & READ); + } + + + /** + * @see com.c2kernel.persistency.ClusterStorage#delete(Integer, String) + */ + @Override + public void delete(Integer sysKey, String path) + throws ClusterStorageException { + throw new ClusterStorageException("Writing not supported in ClientReader"); + } + + /** + * @see com.c2kernel.persistency.ClusterStorage#getName() + */ + @Override + public String getName() { + return "LDAP Client Cluster Reader"; + } + + /** + * @see com.c2kernel.persistency.ClusterStorage#put(Integer, String, C2KLocalObject) + */ + + public void put(Integer sysKey, String path, C2KLocalObject obj) + throws ClusterStorageException { + throw new ClusterStorageException("Writing not supported in ClientReader"); + } + +} diff --git a/src/main/java/com/c2kernel/persistency/LDAPClusterStorage.java b/src/main/java/com/c2kernel/persistency/LDAPClusterStorage.java new file mode 100644 index 0000000..16ac7a0 --- /dev/null +++ b/src/main/java/com/c2kernel/persistency/LDAPClusterStorage.java @@ -0,0 +1,172 @@ +package com.c2kernel.persistency; +import java.util.ArrayList; +import java.util.StringTokenizer; + +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.entity.C2KLocalObject; +import com.c2kernel.lookup.EntityPath; +import com.c2kernel.lookup.InvalidEntityPathException; +import com.c2kernel.lookup.LDAPPropertyManager; +import com.c2kernel.process.Gateway; +import com.c2kernel.property.Property; +import com.c2kernel.utils.Logger; + +public class LDAPClusterStorage extends ClusterStorage { + LDAPPropertyManager ldapStore; + + @Override + public void open() throws ClusterStorageException { + ldapStore = Gateway.getLDAPLookup().getPropManager(); + + } + + @Override + public void close() throws ClusterStorageException { + } + + // introspection + @Override + public short queryClusterSupport(String clusterType) { + if (clusterType.equals(PROPERTY)) + return READWRITE; + else + return NONE; + } + + @Override + public String getName() { + return "LDAP Cluster Storage"; + } + + @Override + public String getId() { + return "LDAP"; + } + + // retrieve object by path + @Override + public C2KLocalObject get(Integer sysKey, String path) throws ClusterStorageException { + Logger.msg(6, "LDAPClusterStorage.get() - "+sysKey+"/"+path); + StringTokenizer tok = new StringTokenizer(path, "/"); + int pathLength = tok.countTokens(); + if (pathLength != 2) + throw new ClusterStorageException("Path length was invalid: "+path); + String type = tok.nextToken(); + + EntityPath thisEntity; + try { + thisEntity = new EntityPath(sysKey.intValue()); + } catch (InvalidEntityPathException e) { + throw new ClusterStorageException("Invalid Syskey:"+sysKey); + } + + String objName = tok.nextToken(); + C2KLocalObject newObj; + + if (type.equals(PROPERTY)) { + try { + String value = ldapStore.getPropertyValue(thisEntity, objName); + Property newProperty = new Property(); + newProperty.setName(objName); + newProperty.setValue(value); + newObj = newProperty; + } catch (ObjectNotFoundException ex) { + throw new ClusterStorageException("Property "+objName+" not found in "+sysKey); + } + + } + else + throw new ClusterStorageException("Cluster type "+type+" not supported."); + + return newObj; + } + // store object by path + @Override + public void put(Integer sysKey, C2KLocalObject obj) throws ClusterStorageException { + Logger.msg(6, "LDAPClusterStorage.put() - "+sysKey+"/"+ClusterStorage.getPath(obj)); + + String type = obj.getClusterType(); + + EntityPath thisEntity; + try { + thisEntity = new EntityPath(sysKey.intValue()); + } catch (InvalidEntityPathException e) { + throw new ClusterStorageException("Invalid Syskey:"+sysKey); + } + + if (type.equals(PROPERTY)) { + try { + ldapStore.setProperty(thisEntity, (Property)obj); + } catch (Exception e1) { + Logger.error(e1); + throw new ClusterStorageException("LDAPClusterStorage - could not write property"); + } + } + else + throw new ClusterStorageException("Cluster type "+type+" not supported."); + + } + // delete cluster + @Override + public void delete(Integer sysKey, String path) throws ClusterStorageException { + StringTokenizer tok = new StringTokenizer(path, "/"); + int pathLength = tok.countTokens(); + if (pathLength != 2) + throw new ClusterStorageException("Path length was invalid: "+path); + String type = tok.nextToken(); + + EntityPath thisEntity; + try { + thisEntity = new EntityPath(sysKey.intValue()); + } catch (InvalidEntityPathException e) { + throw new ClusterStorageException("Invalid Syskey:"+sysKey); + } + + if (type.equals(PROPERTY)) { + try { + ldapStore.deleteProperty(thisEntity, tok.nextToken()); + } catch (Exception e1) { + Logger.error(e1); + throw new ClusterStorageException("LDAPClusterStorage - could not delete property"); + } + } + else + throw new ClusterStorageException("Cluster type "+type+" not supported."); + + } + + /* navigation */ + + // directory listing + @Override + public String[] getClusterContents(Integer sysKey, String path) throws ClusterStorageException { + Logger.msg(6, "LDAPClusterStorage.getClusterContents() - "+sysKey+"/"+path); + StringTokenizer tok = new StringTokenizer(path, "/"); + int pathLength = tok.countTokens(); + if (pathLength > 1) + return new String[0]; + + String type = getClusterType(path); + try + { + EntityPath thisEntity = new EntityPath(sysKey.intValue()); + if (type.equals(PROPERTY)) + return ldapStore.getPropertyNames(thisEntity); + else + if (type.equals("")) { // root query + String[] allClusters = new String[0]; + ArrayList clusterList = new ArrayList(); + if (ldapStore.hasProperties(thisEntity)) + clusterList.add(PROPERTY); + allClusters = clusterList.toArray(allClusters); + return allClusters; + } + else + throw new ClusterStorageException("Cluster type "+type+" not supported."); + } catch (InvalidEntityPathException e) { + throw new ClusterStorageException("Invalid Syskey:"+sysKey); + } catch (ObjectNotFoundException e) { + throw new ClusterStorageException("Entity "+sysKey+" does not exist"); + } + } +} diff --git a/src/main/java/com/c2kernel/persistency/ProxyLoader.java b/src/main/java/com/c2kernel/persistency/ProxyLoader.java new file mode 100644 index 0000000..e614b0d --- /dev/null +++ b/src/main/java/com/c2kernel/persistency/ProxyLoader.java @@ -0,0 +1,133 @@ +package com.c2kernel.persistency; +import java.util.HashMap; +import java.util.StringTokenizer; + +import com.c2kernel.entity.AgentHelper; +import com.c2kernel.entity.C2KLocalObject; +import com.c2kernel.entity.ItemHelper; +import com.c2kernel.entity.ManageableEntity; +import com.c2kernel.lookup.EntityPath; +import com.c2kernel.lookup.LDAPLookup; +import com.c2kernel.persistency.outcome.Outcome; +import com.c2kernel.process.Gateway; +import com.c2kernel.utils.CastorXMLUtility; +import com.c2kernel.utils.Logger; + +/** Used by proxies to load clusters by queryData from the Entity. +* Last client storage - only used if not cached elsewhere +*/ + +public class ProxyLoader extends ClusterStorage { + HashMap entities = new HashMap(); + LDAPLookup lookup; + + @Override + public void open() throws ClusterStorageException { + lookup = Gateway.getLDAPLookup(); + } + + @Override + public void close() throws ClusterStorageException { + } + // introspection + @Override + public short queryClusterSupport(String clusterType) { + return READ; + } + + @Override + public String getName() { + return "Proxy Cluster Loader"; + } + + @Override + public String getId() { + return "CORBA"; + } + + // retrieve object by path + @Override + public C2KLocalObject get(Integer sysKey, String path) throws ClusterStorageException { + try { + ManageableEntity thisEntity = getIOR(sysKey); + String type = getClusterType(path); + + // fetch the xml from the item + String queryData = thisEntity.queryData(path); + + if (queryData != null) { + if (type.equals(OUTCOME)) + return new Outcome(path, queryData); + else + return (C2KLocalObject)CastorXMLUtility.unmarshall(queryData); + } + } catch (Exception e) { + //Logger.error(e); + throw new ClusterStorageException(e.getMessage()); + } + return null; + } + + // store object by path + @Override + public void put(Integer sysKey, C2KLocalObject obj) throws ClusterStorageException { + // not supported + throw new ClusterStorageException("Cannot write to items through the ProxyLoader"); + } + // delete cluster + @Override + public void delete(Integer sysKey, String path) throws ClusterStorageException { + // not supported + throw new ClusterStorageException("Cannot write to items through the ProxyLoader"); + } + + /* navigation */ + + // directory listing + @Override + public String[] getClusterContents(Integer sysKey, String path) throws ClusterStorageException { + try { + ManageableEntity thisEntity = getIOR(sysKey); + String contents = thisEntity.queryData(path+"/all"); + StringTokenizer tok = new StringTokenizer(contents, ","); + String[] result = new String[tok.countTokens()]; + for (int i=0; i extends TreeMap implements C2KLocalObject { + + private int mID=-1; + private String mName; + protected int mSysKey; + private String mPath = ""; + Object keyLock = null; + TransactionManager storage; + EntityProxyObserver listener; + Comparator comp; + EntityProxy source; + Object mLocker; // if this remote map will participate in a transaction + + public RemoteMap(int sysKey, String path, Object locker) { + + super(new Comparator() { + @Override + public int compare(String o1, String o2) { + Integer i1 = null, i2 = null; + try { + i1 = Integer.valueOf(o1); + i2 = Integer.valueOf(o2); + return i1.compareTo(i2); + } catch (NumberFormatException ex) { } + return o1.compareTo(o2); + } + }); + + mSysKey = sysKey; + mLocker = locker; + + // split the path into path/name + int lastSlash = path.lastIndexOf("/"); + mName = path.substring(lastSlash+1); + if (lastSlash>0) mPath = path.substring(0,lastSlash); + + // see if the name is also a suitable id + try { + mID = Integer.parseInt(mName); + } catch (NumberFormatException e) {} + storage = Gateway.getStorage(); + + listener = new EntityProxyObserver() { + @Override + public void add(V obj) { + synchronized (this) { + putLocal(obj.getName(), obj); + } + } + + @Override + public void remove(String id) { + synchronized (this) { + removeLocal(id); + } + } + + @Override + public void control(String control, String msg) { } + }; + + try { + source = Gateway.getProxyManager().getProxy(new EntityPath(sysKey)); + source.subscribe(new MemberSubscription(listener, path, false)); + } catch (Exception ex) { + Logger.error("Error subscribing to remote map. Changes will not be received"); + Logger.error(ex); + } + } + + protected void loadKeys() { + if (keyLock != null) return; + clear(); + keyLock = new Object(); + synchronized(this) { + String[] keys; + try { + keys = storage.getClusterContents(mSysKey, mPath+mName); + for (String key : keys) super.put(key, null); + } catch (ClusterStorageException e) { + Logger.error(e); + } + + } + } + + public synchronized int getLastId() { + loadKeys(); + if (size() == 0) return -1; + try { + return Integer.parseInt(lastKey()); + } catch (NumberFormatException ex) { + return -1; + } + } + + + // c2kLocalObject methods + public void setID(int id) { mID = id; } + + public int getID() { return mID; } + + @Override + public void setName(String name) { mName = name; } + + @Override + public String getName() { return mName; } + + /** + * Cannot be stored + */ + @Override + public String getClusterType() { + return null; + } + /** + * @see java.util.Map#clear() + */ + @Override + public synchronized void clear() { + synchronized (this) { + super.clear(); + } + keyLock = null; + } + + + + /** + * @see java.util.Map#containsKey(Object) + */ + @Override + public synchronized boolean containsKey(Object key) { + if (keyLock == null) loadKeys(); + return super.containsKey(key); + } + + /** + * This must retrieve all the values until a match is made. + * Very expensive, but if you must, you must. + * @see java.util.Map#containsValue(Object) + */ + @Override + public synchronized boolean containsValue(Object value) { + loadKeys(); + synchronized(this) { + for (String key: keySet()) { + if (get(key).equals(value)) return true; + } + } + return false; + } + + + /** + * @see java.util.Map#get(Object) + */ + @Override + public synchronized V get(Object objKey) { + loadKeys(); + String key; + if (objKey instanceof Integer) + key = ((Integer)objKey).toString(); + else if (objKey instanceof String) + key = (String)objKey; + else + return null; + + synchronized(this) { + try { + V value = super.get(key); + if (value == null) { + value = (V)storage.get(mSysKey, mPath+mName+"/"+key, mLocker); + super.put(key, value); + } + return value; + } catch (ClusterStorageException e) { + Logger.error(e); + } catch (ObjectNotFoundException e) { + Logger.error(e); + } + } + return null; + } + + /** + * @see java.util.Map#isEmpty() + */ + @Override + public synchronized boolean isEmpty() { + loadKeys(); + return super.isEmpty(); + } + + /** + * @see java.util.Map#keySet() + */ + @Override + public synchronized Set keySet() { + loadKeys(); + return super.keySet(); + } + + /** + * Inserts the given object into the storage + * the key is ignored - it can be fetched from the value. + * @see java.util.Map#put(Object, Object) + */ + @Override + public synchronized V put(String key, V value) { + try { + synchronized(this) { + storage.put(mSysKey, value, mLocker); + return putLocal(key, value); + } + } catch (ClusterStorageException e) { + Logger.error(e); + return null; + } + } + + protected synchronized V putLocal(String key, V value) { + return super.put(key, value); + } + + /** + * @see java.util.Map#remove(Object) + */ + @Override + public synchronized V remove(Object key) { + loadKeys(); + if (containsKey(key)) try { + synchronized(keyLock) { + storage.remove(mSysKey, mPath+mName+"/"+key, mLocker); + return super.remove(key); + } + } catch (ClusterStorageException e) { + Logger.error(e); + } + return null; + } + + protected synchronized V removeLocal(Object key) { + return super.remove(key); + } + + /** + * @see java.util.Map#size() + */ + @Override + public synchronized int size() { + loadKeys(); + return super.size(); + } + + /** + * @see java.util.Map#values() + */ + @Override + public synchronized Collection values() { + return new RemoteSet(this); + } + + /** + * Basic implementation of Set and Collection to bridge to the Iterator + * Disallows all writes. + */ + + private class RemoteSet extends AbstractSet { + RemoteMap mParent; + + public RemoteSet(RemoteMap parent) { + mParent = parent; + } + + // no modifications allowed + @Override + public boolean add(E o) { + throw new UnsupportedOperationException(); + } + @Override + public boolean addAll(Collection c) { + throw new UnsupportedOperationException(); + } + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); + } + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator iterator() { + return new RemoteIterator(mParent); + } + + @Override + public int size() { + return mParent.size(); + } + + } + /** + * Iterator view on RemoteMap data. Doesn't preload anything. + * REVISIT: Will go strange if the RemoteMap is modified. Detect this and throw ConcurrentMod ex + */ + private class RemoteIterator implements Iterator { + RemoteMap mParent; + Iterator iter; + + public RemoteIterator(RemoteMap parent) { + mParent = parent; + iter = mParent.keySet().iterator(); + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public C next() { + return mParent.get(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + } + + + + +} diff --git a/src/main/java/com/c2kernel/persistency/TransactionManager.java b/src/main/java/com/c2kernel/persistency/TransactionManager.java new file mode 100644 index 0000000..d2679a8 --- /dev/null +++ b/src/main/java/com/c2kernel/persistency/TransactionManager.java @@ -0,0 +1,324 @@ +package com.c2kernel.persistency; + +import java.util.ArrayList; +import java.util.HashMap; + +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.entity.C2KLocalObject; +import com.c2kernel.entity.agent.JobList; +import com.c2kernel.events.History; +import com.c2kernel.utils.Logger; + +public class TransactionManager { + + HashMap locks; + HashMap> pendingTransactions; + ClusterStorageManager storage; + + public TransactionManager() throws ClusterStorageException { + storage = new ClusterStorageManager(); + locks = new HashMap(); + pendingTransactions = new HashMap>(); + } + + public boolean hasPendingTransactions() + { + return pendingTransactions.size() > 0; + } + + public ClusterStorageManager getDb() { + return storage; + } + + public void close() { + if (pendingTransactions.size() != 0) { + Logger.error("There were pending transactions on shutdown. All changes were lost."); + dumpPendingTransactions(0); + } + Logger.msg("Transaction Manager: Closing storages"); + storage.close(); + } + + public String[] getClusterContents(int sysKey, String path) throws ClusterStorageException { + if (path.startsWith("/") && path.length() > 1) path = path.substring(1); + return storage.getClusterContents(new Integer(sysKey), path); + } + + /** + * Public get method. Required a 'locker' object for a transaction key. + * Checks the transaction table first to see if the caller has uncommitted changes + */ + public C2KLocalObject get(int sysKey, String path, Object locker) + throws ClusterStorageException, + ObjectNotFoundException { + if (path.startsWith("/") && path.length() > 1) path = path.substring(1); + + // deal out top level remote maps, if transactions aren't needed + if (path.indexOf('/') == -1) { + if (path.equals(ClusterStorage.HISTORY) && locker != null) + return new History(sysKey, locker); + if (path.equals(ClusterStorage.JOB) && locker != null) + return new JobList(sysKey, locker); + } + + Integer sysKeyIntObj = new Integer(sysKey); + // check to see if the locker has been modifying this cluster + synchronized(locks) { + if (locks.containsKey(sysKeyIntObj) && locks.get(sysKeyIntObj).equals(locker)) { + ArrayList lockerTransaction = pendingTransactions.get(locker); + for (TransactionEntry thisEntry : lockerTransaction) { + if (sysKey == thisEntry.sysKey.intValue() && path.equals(thisEntry.getPath())) { + if (thisEntry.obj == null) + throw new ClusterStorageException("ClusterStorageManager.get() - Cluster " + path + " has been deleted in " + sysKey + + " but not yet committed"); + return thisEntry.obj; + } + } + } + } + return storage.get(sysKeyIntObj, path); + } + + /** + * Public put method. Manages the transaction table keyed by the object 'locker'. + * If this object is null, transaction support is bypassed (so long as no lock exists on that object). + */ + public void put(int sysKey, C2KLocalObject obj, Object locker) throws ClusterStorageException { + Integer sysKeyIntObj = new Integer(sysKey); + ArrayList lockerTransaction; + String path = ClusterStorage.getPath(obj); + + synchronized(locks) { + // look to see if this object is already locked + if (locks.containsKey(sysKeyIntObj)) { + // if it's this locker, get the transaction list + Object thisLocker = locks.get(sysKeyIntObj); + if (thisLocker.equals(locker)) // retrieve the transaction list + lockerTransaction = pendingTransactions.get(locker); + else // locked by someone else + throw new ClusterStorageException("ClusterStorageManager.get() - Access denied: Object " + sysKeyIntObj + + " has been locked for writing by " + thisLocker); + } + else { // either we are the locker, or there is no locker + if (locker == null) { // non-locking put/delete + storage.put(sysKeyIntObj, obj); + return; + } + else {// initialise the transaction + locks.put(sysKeyIntObj, locker); + lockerTransaction = new ArrayList(); + pendingTransactions.put(locker, lockerTransaction); + } + } + + // create the new entry in the transaction table + TransactionEntry newEntry = new TransactionEntry(sysKeyIntObj, path, obj); + /* equals() in TransactionEntry only compares sysKey and path, so we can use + * contains() in ArrayList to looks for preexisting entries for this cluster + * and overwrite them. + */ + if (lockerTransaction.contains(newEntry)) + lockerTransaction.remove(newEntry); + lockerTransaction.add(newEntry); + } + } + + /** Public delete method. Uses the put method, with null as the object value. + */ + public void remove(int sysKey, String path, Object locker) throws ClusterStorageException { + Integer sysKeyIntObj = new Integer(sysKey); + ArrayList lockerTransaction; + synchronized(locks) { + // look to see if this object is already locked + if (locks.containsKey(sysKeyIntObj)) { + // if it's this locker, get the transaction list + Object thisLocker = locks.get(sysKeyIntObj); + if (thisLocker.equals(locker)) // retrieve the transaction list + lockerTransaction = pendingTransactions.get(locker); + else // locked by someone else + throw new ClusterStorageException("ClusterStorageManager.get() - Access denied: Object " + sysKeyIntObj + + " has been locked for writing by " + thisLocker); + } + else { // either we are the locker, or there is no locker + if (locker == null) { // non-locking put/delete + storage.remove(sysKeyIntObj, path); + return; + } + else {// initialise the transaction + locks.put(sysKeyIntObj, locker); + lockerTransaction = new ArrayList(); + pendingTransactions.put(locker, lockerTransaction); + } + } + + // create the new entry in the transaction table + TransactionEntry newEntry = new TransactionEntry(sysKeyIntObj, path, null); + /* equals() in TransactionEntry only compares sysKey and path, so we can use + * contains() in ArrayList to looks for preexisting entries for this cluster + * and overwrite them. + */ + if (lockerTransaction.contains(newEntry)) + lockerTransaction.remove(newEntry); + lockerTransaction.add(newEntry); + } + } + + /** + * Removes all child objects from the given path + * + * @param sysKey - entity to delete from + * @param path - root path to delete + * @param locker - locking object + * + * @throws ClusterStorageException - when deleting fails + */ + public void removeCluster(int sysKey, String path, Object locker) throws ClusterStorageException { + + String[] children = getClusterContents(sysKey, path); + for (String element : children) + removeCluster(sysKey, path+(path.length()>0?"/":"")+element, locker); + if (children.length==0 && path.indexOf("/") > -1) + remove(sysKey, path, locker); + + } + /** + * Writes all pending changes to the backends. + */ + public void commit(Object locker) { + synchronized(locks) { + ArrayList lockerTransactions = pendingTransactions.get(locker); + HashMap exceptions = new HashMap(); + // quit if no transactions are present; + if (lockerTransactions == null) return; + for (TransactionEntry thisEntry : lockerTransactions) { + try { + if (thisEntry.obj == null) + storage.remove(thisEntry.sysKey, thisEntry.path); + else + storage.put(thisEntry.sysKey, thisEntry.obj); + locks.remove(thisEntry.sysKey); + } catch (Exception e) { + exceptions.put(thisEntry, e); + } + } + pendingTransactions.remove(locker); + if (exceptions.size() > 0) { // oh dear + Logger.error("TransactionManager.commit() - Problems during transaction commit of locker "+locker.toString()+". Database may be in an inconsistent state."); + for (TransactionEntry entry : exceptions.keySet()) { + Exception ex = exceptions.get(entry); + Logger.msg(entry.toString()); + Logger.error(ex); + } + dumpPendingTransactions(0); + Logger.die("Database failure"); + } + + } + } + + /** + * Rolls back all changes sent in the name of 'locker' and unlocks the sysKeys + */ + public void abort(Object locker) { + synchronized(locks) { + if (locks.containsValue(locker)) { + for (Integer thisKey : locks.keySet()) { + if (locks.get(thisKey).equals(locker)) + locks.remove(thisKey); + } + } + pendingTransactions.remove(locker); + } + } + + public void clearCache(int sysKey, String path) { + if (sysKey == -1) + storage.clearCache(); + else if (path == null) + storage.clearCache(new Integer(sysKey)); + else + storage.clearCache(new Integer(sysKey), path); + + } + + public void dumpPendingTransactions(int logLevel) { + Logger.msg(logLevel, "================"); + Logger.msg(logLevel, "Transaction dump"); + Logger.msg(logLevel, "Locked Items:"); + if (locks.size() == 0) + Logger.msg(logLevel, " None"); + else + for (Integer thisKey : locks.keySet()) { + Object locker = locks.get(thisKey); + Logger.msg(logLevel, " "+thisKey+" locked by "+locker); + } + + Logger.msg(logLevel, "Open transactions:"); + if (pendingTransactions.size() == 0) + Logger.msg(logLevel, " None"); + else + for (Object thisLocker : pendingTransactions.keySet()) { + Logger.msg(logLevel, " Transaction owner:"+thisLocker); + ArrayList entries = pendingTransactions.get(thisLocker); + for (TransactionEntry thisEntry : entries) { + Logger.msg(logLevel, " "+thisEntry.toString()); + } + } + } + + /** Used in the transaction table to store details of a put until commit + */ + class TransactionEntry { + public Integer sysKey; + public String path; + public C2KLocalObject obj; + public TransactionEntry(Integer sysKey, String path, C2KLocalObject obj) { + this.sysKey = sysKey; + this.path = path; + this.obj = obj; + } + + public String getPath() { + return ClusterStorage.getPath(obj); + } + + @Override + public String toString() { + StringBuffer report = new StringBuffer(); + if (obj == null) + report.append("Delete"); + else + report.append("Put "+obj.getClass().getName()); + report.append(" at ").append(path).append(" in ").append(sysKey); + return report.toString(); + + } + /** + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + return sysKey.hashCode()*getPath().hashCode(); + } + + /** + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object other) { + if (other instanceof TransactionEntry) + return hashCode() == ((TransactionEntry)other).hashCode(); + return false; + } + + } + + public Object query(String id, Object query) throws ClusterStorageException { + return storage.query(id, query); + } + + public String queryToXML(String id, String query, boolean genericFormat) throws ClusterStorageException { + return storage.queryToXML(id, query, genericFormat); + } + +} diff --git a/src/main/java/com/c2kernel/persistency/XMLClusterStorage.java b/src/main/java/com/c2kernel/persistency/XMLClusterStorage.java new file mode 100644 index 0000000..5909fac --- /dev/null +++ b/src/main/java/com/c2kernel/persistency/XMLClusterStorage.java @@ -0,0 +1,154 @@ +package com.c2kernel.persistency; +import java.io.File; +import java.util.ArrayList; + +import com.c2kernel.entity.C2KLocalObject; +import com.c2kernel.lookup.EntityPath; +import com.c2kernel.lookup.InvalidEntityPathException; +import com.c2kernel.persistency.outcome.Outcome; +import com.c2kernel.process.Gateway; +import com.c2kernel.utils.CastorXMLUtility; +import com.c2kernel.utils.FileStringUtility; +import com.c2kernel.utils.Logger; + +public class XMLClusterStorage extends ClusterStorage { + String rootDir=null; + + public XMLClusterStorage() { + } + + @Override + public void open() throws ClusterStorageException { + String rootProp = Gateway.getProperty("XMLStorage.root"); + if (rootProp == null) + throw new ClusterStorageException("XMLClusterStorage.open() - Root path not given in config file."); + + rootDir = new File(rootProp).getAbsolutePath(); + + if( !FileStringUtility.checkDir( rootDir ) ) { + Logger.error("XMLClusterStorage.open() - Path " + rootDir + "' does not exist. Attempting to create."); + boolean success = FileStringUtility.createNewDir(rootDir); + if (!success) throw new ClusterStorageException("XMLClusterStorage.open() - Could not create dir "+ rootDir +". Cannot continue."); + } + } + + @Override + public void close() { + rootDir = null; + } + + // introspection + @Override + public short queryClusterSupport(String clusterType) { + return ClusterStorage.READWRITE; + } + + @Override + public String getName() { + return "XML File Cluster Storage"; + } + + @Override + public String getId() { + return "XML"; + } + + /* object manipulation */ + + // retrieve object by path + @Override + public C2KLocalObject get(Integer sysKey, String path) throws ClusterStorageException { + try { + String type = ClusterStorage.getClusterType(path); + String filePath = getFilePath(sysKey, path)+".xml"; + String objString = FileStringUtility.file2String(filePath); + if (objString.length() == 0) return null; + + if (type.equals("Outcome")) + return new Outcome(path, objString); + else + return (C2KLocalObject)CastorXMLUtility.unmarshall(objString); + + } catch (Exception e) { + Logger.msg(3,"XMLClusterStorage.get() - The path "+path+" from "+sysKey+" does not exist.: "+e.getMessage()); + } + return null; + } + + // store object by path + @Override + public void put(Integer sysKey, C2KLocalObject obj) throws ClusterStorageException { + try { + String filePath = getFilePath(sysKey, getPath(obj)+".xml"); + Logger.msg(7, "Writing "+filePath); + String data = CastorXMLUtility.marshall(obj); + + String dir = filePath.substring(0, filePath.lastIndexOf('/')); + if( !FileStringUtility.checkDir( dir ) ) { + boolean success = FileStringUtility.createNewDir(dir); + if (!success) throw new ClusterStorageException("XMLClusterStorage.put() - Could not create dir "+ dir +". Cannot continue."); + } + FileStringUtility.string2File(filePath, data); + } catch (Exception e) { + Logger.error(e); + throw new ClusterStorageException("XMLClusterStorage.put() - Could not write "+getPath(obj)+" to "+sysKey); + } + } + + // delete cluster + @Override + public void delete(Integer sysKey, String path) throws ClusterStorageException { + try { + String filePath = getFilePath(sysKey, path+".xml"); + boolean success = FileStringUtility.deleteDir(filePath, true, true); + if (success) return; + filePath = getFilePath(sysKey, path); + success = FileStringUtility.deleteDir(filePath, true, true); + if (success) return; + } catch(Exception e) { } + throw new ClusterStorageException("XMLClusterStorage.delete() - Failure deleting path "+path+" in "+sysKey); + } + + /* navigation */ + + // directory listing + @Override + public String[] getClusterContents(Integer sysKey, String path) throws ClusterStorageException { + String[] result = new String[0]; + try { + String filePath = getFilePath(sysKey, path); + ArrayList paths = FileStringUtility.listDir( filePath, true, false ); + if (paths == null) return result; // dir doesn't exist yet + ArrayList contents = new ArrayList(); + String previous = null; + for (int i=0; i -1) next = next.substring(next.lastIndexOf('/')+1); + contents.add(next); + } + + result = contents.toArray(result); + return result; + } catch (Exception e) { + Logger.error(e); + throw new ClusterStorageException("XMLClusterStorage.getClusterContents() - Could not get contents of "+path+" from "+sysKey+": "+e.getMessage()); + } + } + + protected String getFilePath(Integer sysKey, String path) throws InvalidEntityPathException { + EntityPath thisEntity = new EntityPath(sysKey.intValue()); + if (path.length() == 0 || path.charAt(0) != '/') path = "/"+path; + String filePath = rootDir+thisEntity.toString()+path; + Logger.msg(8, "XMLClusterStorage.getFilePath() - "+filePath); + return filePath; + } +} diff --git a/src/main/java/com/c2kernel/persistency/outcome/Outcome.java b/src/main/java/com/c2kernel/persistency/outcome/Outcome.java new file mode 100644 index 0000000..d321f69 --- /dev/null +++ b/src/main/java/com/c2kernel/persistency/outcome/Outcome.java @@ -0,0 +1,177 @@ +package com.c2kernel.persistency.outcome; +import java.io.StringReader; +import java.io.StringWriter; +import java.util.StringTokenizer; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import org.apache.xml.serialize.Method; +import org.apache.xml.serialize.OutputFormat; +import org.apache.xml.serialize.XMLSerializer; +import org.w3c.dom.Document; +import org.w3c.dom.NodeList; +import org.w3c.dom.Text; +import org.xml.sax.InputSource; + +import com.c2kernel.common.PersistencyException; +import com.c2kernel.entity.C2KLocalObject; +import com.c2kernel.persistency.ClusterStorage; +import com.c2kernel.utils.Logger; + +public class Outcome implements C2KLocalObject { + int mID = -1; + String mData; + String mSchemaType; + int mSchemaVersion; + Document dom; + static DocumentBuilder parser; + + static { + DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); + dbf.setValidating(false); + dbf.setNamespaceAware(false); + try { + parser = dbf.newDocumentBuilder(); + } catch (ParserConfigurationException e) { + Logger.error(e); + } + } + + //id is the eventID + public Outcome(int id, String data, String schemaType, int schemaVersion) { + mID = id; + mData = data; + mSchemaType = schemaType; + mSchemaVersion = schemaVersion; + } + + public Outcome(String path, String data) throws PersistencyException { + // derive all the meta data from the path + StringTokenizer tok = new StringTokenizer(path,"/"); + if (tok.countTokens() != 3 && !(tok.nextToken().equals("Outcome"))) + throw new PersistencyException("Outcome() - Outcome path must have three components: "+path, null); + mSchemaType = tok.nextToken(); + String verstring = tok.nextToken(); + String objId = tok.nextToken(); + try { + mSchemaVersion = Integer.parseInt(verstring); + } catch (NumberFormatException ex) { + throw new PersistencyException("Outcome() - Outcome version was an invalid number: "+verstring, null); + } + try { + mID = Integer.parseInt(objId); + } catch (NumberFormatException ex) { + mID = -1; + } + mData = data; + } + + public void setID(int ID) { + mID = ID; + } + + public int getID() { + return mID; + } + + @Override + public void setName(String name) { + try { + mID = Integer.parseInt(name); + } catch (NumberFormatException e) { + Logger.error("Invalid id set on Outcome:"+name); + } + } + + @Override + public String getName() { + return String.valueOf(mID); + } + + public void setData(String data) { + mData = data; + dom = null; + } + + public void setData(Document data) { + mData = serialize(data, false); + dom = data; + } + + public String getData() { + return mData; + } + + public void setSchemaType(String schemaType) { + mSchemaType = schemaType; + } + + public String getSchemaType() { + return mSchemaType; + } + + public void setSchemaURL(int schemaVersion) { + mSchemaVersion = schemaVersion; + } + + public int getSchemaVersion() { + return mSchemaVersion; + } + + public void setSchemaVersion(int schVer) { + mSchemaVersion = schVer; + } + + @Override + public String getClusterType() { + return ClusterStorage.OUTCOME; + } + + // special script API methods + + /** + * Parses the outcome into a DOM tree + * @return a DOM Document + */ + public Document getDOM() { + if (dom == null) + try { + synchronized (parser) { + dom = parser.parse(new InputSource(new StringReader(mData))); + } + } catch (Exception e) { + Logger.error(e); + return null; + } + return dom; + } + + public String getField(String name) { + NodeList elements = getDOM().getDocumentElement().getElementsByTagName(name); + if (elements.getLength() == 1 && elements.item(0).hasChildNodes() && elements.item(0).getFirstChild() instanceof Text) + return ((Text)elements.item(0).getFirstChild()).getData(); + else + return null; + } + + static public String serialize(Document doc, boolean prettyPrint) + { + String serializedDoc = null; + OutputFormat format = new OutputFormat(Method.XML, null, prettyPrint); + StringWriter stringOut = new StringWriter(); + XMLSerializer serial = new XMLSerializer(stringOut, format); + try + { + serial.asDOMSerializer(); + serial.serialize(doc); + } + catch (java.io.IOException ex) + { + Logger.error(ex.toString()); + } + serializedDoc = stringOut.toString(); + return serializedDoc; + } +} diff --git a/src/main/java/com/c2kernel/persistency/outcome/OutcomeValidator.java b/src/main/java/com/c2kernel/persistency/outcome/OutcomeValidator.java new file mode 100644 index 0000000..73f5706 --- /dev/null +++ b/src/main/java/com/c2kernel/persistency/outcome/OutcomeValidator.java @@ -0,0 +1,188 @@ + +package com.c2kernel.persistency.outcome; + +import java.io.IOException; +import java.io.StringReader; + +import org.apache.xerces.parsers.DOMParser; +import org.apache.xerces.parsers.IntegratedParserConfiguration; +import org.apache.xerces.parsers.XMLGrammarPreparser; +import org.apache.xerces.util.SymbolTable; +import org.apache.xerces.util.XMLGrammarPoolImpl; +import org.apache.xerces.xni.XNIException; +import org.apache.xerces.xni.grammars.XMLGrammarDescription; +import org.apache.xerces.xni.parser.XMLErrorHandler; +import org.apache.xerces.xni.parser.XMLInputSource; +import org.apache.xerces.xni.parser.XMLParseException; +import org.apache.xerces.xni.parser.XMLParserConfiguration; +import org.xml.sax.ErrorHandler; +import org.xml.sax.SAXException; +import org.xml.sax.SAXParseException; + +import com.c2kernel.common.InvalidDataException; +import com.c2kernel.utils.Logger; + +/************************************************************************** + * + * $Revision: 1.24 $ + * $Date: 2005/06/09 13:50:10 $ + * + * Copyright (C) 2003 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + + +public class OutcomeValidator implements ErrorHandler, XMLErrorHandler { + + protected static final String NAMESPACES_FEATURE_ID = "http://xml.org/sax/features/namespaces"; + /** Validation feature id (http://xml.org/sax/features/validation). */ + protected static final String VALIDATION_FEATURE_ID = "http://xml.org/sax/features/validation"; + /** Schema validation feature id (http://apache.org/xml/features/validation/schema). */ + protected static final String SCHEMA_VALIDATION_FEATURE_ID = "http://apache.org/xml/features/validation/schema"; + /** Schema full checking feature id (http://apache.org/xml/features/validation/schema-full-checking). */ + protected static final String SCHEMA_FULL_CHECKING_FEATURE_ID = "http://apache.org/xml/features/validation/schema-full-checking"; + public static final String GRAMMAR_POOL = "http://apache.org/xml/properties/internal/grammar-pool"; + + static SchemaValidator schemaValid = new SchemaValidator(); + + Schema schema; + protected StringBuffer errors = null; + XMLGrammarPoolImpl schemaGrammarPool = new XMLGrammarPoolImpl(1); + SymbolTable sym = new SymbolTable(); + + public static OutcomeValidator getValidator(Schema schema) throws InvalidDataException { + String schemaId = schema.docType+"_"+schema.docVersion; + + if (schemaId.equals("Schema_0")) + return schemaValid; + + return new OutcomeValidator(schema); + } + + protected OutcomeValidator() { + errors = new StringBuffer(); + } + + public OutcomeValidator(Schema schema) throws InvalidDataException { + this.schema = schema; + + if (schema.docType.equals("Schema")) + throw new InvalidDataException("Use SchemaValidator to validate schema", ""); + + errors = new StringBuffer(); + Logger.msg(5, "Parsing "+schema.docType+" version "+schema.docVersion+". "+schema.schema.length()+" chars"); + + XMLGrammarPreparser preparser = new XMLGrammarPreparser(sym); + preparser.registerPreparser(XMLGrammarDescription.XML_SCHEMA, null); + preparser.setProperty(GRAMMAR_POOL, schemaGrammarPool); + + preparser.setFeature(NAMESPACES_FEATURE_ID, true); + preparser.setFeature(VALIDATION_FEATURE_ID, true); + preparser.setFeature(SCHEMA_VALIDATION_FEATURE_ID, true); + preparser.setFeature(SCHEMA_FULL_CHECKING_FEATURE_ID, true); + preparser.setErrorHandler(this); + try { + preparser.preparseGrammar(XMLGrammarDescription.XML_SCHEMA, new XMLInputSource(null, null, null, new StringReader(schema.schema), null)); + } catch (IOException ex) { + throw new InvalidDataException("Error parsing schema: "+ex.getMessage(), ""); + } + + if (errors.length() > 0) { + throw new InvalidDataException("Schema error: \n"+errors.toString(), ""); + } + + } + + public synchronized String validate(Outcome outcome) { + if (outcome == null) return "Outcome object was null"; + Logger.msg(5, "Validating outcome no "+outcome.getID()+" as "+schema.docType+" v"+schema.docVersion); + if (outcome.getSchemaType().equals(schema.docType) + && outcome.getSchemaVersion() == schema.docVersion) { + return validate(outcome.getData()); + } + else + return "Outcome type and version did not match schema "+schema.docType; + } + + public synchronized String validate(String outcome) { + if (outcome == null) return "Outcome String was null"; + errors = new StringBuffer(); + try { + XMLParserConfiguration parserConfiguration = new IntegratedParserConfiguration(sym, schemaGrammarPool); + parserConfiguration.setFeature(NAMESPACES_FEATURE_ID, true); + parserConfiguration.setFeature(VALIDATION_FEATURE_ID, true); + // now we can still do schema features just in case, + // so long as it's our configuraiton...... + parserConfiguration.setFeature(SCHEMA_VALIDATION_FEATURE_ID, true); + parserConfiguration.setFeature(SCHEMA_FULL_CHECKING_FEATURE_ID, true); + DOMParser parser = new DOMParser(parserConfiguration); + parser.setErrorHandler(this); + + parser.parse(new XMLInputSource(null, null, null, new StringReader(outcome), null)); + } catch (Exception e) { + return e.getMessage(); + } + return errors.toString(); + } + + private void appendError(String level, Exception ex) { + errors.append(level); + String message = ex.getMessage(); + if (message == null || message.length()==0) + message = ex.getClass().getName(); + errors.append(message); + errors.append("\n"); + } + + /** + * ErrorHandler for instances + */ + @Override + public void error(SAXParseException ex) throws SAXException { + appendError("ERROR: ", ex); + } + + /** + * + */ + @Override + public void fatalError(SAXParseException ex) throws SAXException { + appendError("FATAL: ", ex); + } + + /** + * + */ + @Override + public void warning(SAXParseException ex) throws SAXException { + appendError("WARNING: ", ex); + } + + /** + * XMLErrorHandler for schema + */ + @Override + public void error(String domain, String key, XMLParseException ex) + throws XNIException { + appendError("ERROR: ", ex); + } + + /** + * + */ + @Override + public void fatalError(String domain, String key, XMLParseException ex) + throws XNIException { + appendError("FATAL: ", ex); + } + + /** + * + */ + @Override + public void warning(String domain, String key, XMLParseException ex) + throws XNIException { + appendError("WARNING: ", ex); + } + +} diff --git a/src/main/java/com/c2kernel/persistency/outcome/Schema.java b/src/main/java/com/c2kernel/persistency/outcome/Schema.java new file mode 100644 index 0000000..73969f2 --- /dev/null +++ b/src/main/java/com/c2kernel/persistency/outcome/Schema.java @@ -0,0 +1,18 @@ +package com.c2kernel.persistency.outcome; + +/** + * @author Andrew Branson + * + * $Revision: 1.3 $ + * $Date: 2006/09/14 14:13:26 $ + * + * Copyright (C) 2003 CERN - European Organization for Nuclear Research + * All rights reserved. + */ + +public class Schema { + public String docType; + public int docVersion; + public boolean breakApart; + public String schema; + } diff --git a/src/main/java/com/c2kernel/persistency/outcome/SchemaValidator.java b/src/main/java/com/c2kernel/persistency/outcome/SchemaValidator.java new file mode 100644 index 0000000..be8564b --- /dev/null +++ b/src/main/java/com/c2kernel/persistency/outcome/SchemaValidator.java @@ -0,0 +1,55 @@ +package com.c2kernel.persistency.outcome; + +import java.io.IOException; +import java.io.StringReader; + +import org.exolab.castor.xml.schema.reader.SchemaReader; +import org.xml.sax.InputSource; + + +/************************************************************************** + * + * $Revision: 1.2 $ + * $Date: 2005/04/26 06:48:13 $ + * + * Copyright (C) 2003 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + + + +public class SchemaValidator extends OutcomeValidator { + + org.exolab.castor.xml.schema.Schema castorSchema; + /** + * + */ + + public SchemaValidator() { + + } + + public org.exolab.castor.xml.schema.Schema getSOM() { + return castorSchema; + } + + /** + * + */ + + @Override + public synchronized String validate(String outcome) { + errors = new StringBuffer(); + try { + InputSource schemaSource = new InputSource(new StringReader(outcome)); + SchemaReader mySchemaReader = new SchemaReader(schemaSource); + mySchemaReader.setErrorHandler(this); + mySchemaReader.setValidation(true); + castorSchema = mySchemaReader.read(); + } catch (IOException e) { + errors.append(e.getMessage()); + } + return errors.toString(); + } + +} diff --git a/src/main/java/com/c2kernel/persistency/outcome/Viewpoint.java b/src/main/java/com/c2kernel/persistency/outcome/Viewpoint.java new file mode 100644 index 0000000..a3fe283 --- /dev/null +++ b/src/main/java/com/c2kernel/persistency/outcome/Viewpoint.java @@ -0,0 +1,180 @@ +package com.c2kernel.persistency.outcome; + +import com.c2kernel.common.InvalidDataException; +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.entity.C2KLocalObject; +import com.c2kernel.events.Event; +import com.c2kernel.lookup.Path; +import com.c2kernel.persistency.ClusterStorage; +import com.c2kernel.persistency.ClusterStorageException; +import com.c2kernel.process.Gateway; + +/** + * @author Andrew Branson + * + * $Revision: 1.10 $ + * $Date: 2005/10/05 07:39:36 $ + * + * Copyright (C) 2003 CERN - European Organization for Nuclear Research + * All rights reserved. + */ + +// public static final String codeRevision = +// "$Revision: 1.10 $ $Date: 2005/10/05 07:39:36 $ $Author: abranson $"; +public class Viewpoint implements C2KLocalObject { + + int ID = -1; // not really used in this + + // db fields + int sysKey; + String schemaName; + String name; + int schemaVersion; + int eventId; + public static final int NONE = -1; + + public Viewpoint() { + eventId = NONE; + sysKey = Path.INVALID; + schemaVersion = NONE; + schemaName = null; + name = null; + } + + public Viewpoint(int sysKey, String schemaName, String name, int schemaVersion, int eventId) { + this.sysKey = sysKey; + this.schemaName = schemaName; + this.name = name; + this.schemaVersion = schemaVersion; + this.eventId = eventId; + } + + public Outcome getOutcome() throws ObjectNotFoundException, ClusterStorageException { + if (eventId == NONE) throw new ObjectNotFoundException("No last eventId defined", ""); + Outcome retVal = (Outcome)Gateway.getStorage().get(sysKey, ClusterStorage.OUTCOME+"/"+schemaName+"/"+schemaVersion+"/"+eventId, null); + return retVal; + } + + @Override + public String getClusterType() { + return ClusterStorage.VIEWPOINT; + } + + + /** + * Returns the eventId. + * @return int + */ + public int getEventId() { + return eventId; + } + + /** + * Returns the iD. + * @return int + */ + public int getID() { + return ID; + } + + /** + * Returns the name. + * @return String + */ + @Override + public String getName() { + return name; + } + + /** + * Returns the schemaName. + * @return String + */ + public String getSchemaName() { + return schemaName; + } + + /** + * Returns the schemaVersion. + * @return int + */ + public int getSchemaVersion() { + return schemaVersion; + } + + /** + * Returns the sysKey. + * @return int + */ + public int getSysKey() { + return sysKey; + } + + /** + * Sets the eventId. + * @param eventId The eventId to set + */ + public void setEventId(int eventId) { + this.eventId = eventId; + } + + /** + * Sets the iD. + * @param iD The iD to set + */ + public void setID(int iD) { + ID = iD; + } + + /** + * Sets the name. + * @param name The name to set + */ + @Override + public void setName(String name) { + this.name = name; + } + + /** + * Sets the schemaName. + * @param schemaName The schemaName to set + */ + public void setSchemaName(String schemaName) { + this.schemaName = schemaName; + } + + /** + * Sets the schemaVersion. + * @param schemaVersion The schemaVersion to set + */ + public void setSchemaVersion(int schemaVersion) { + this.schemaVersion = schemaVersion; + } + + /** + * Sets the sysKey. + * @param sysKey The sysKey to set + */ + public void setSysKey(int sysKey) { + this.sysKey = sysKey; + } + + /** + * Method getEvent. + * @return GDataRecord + */ + public Event getEvent() + throws InvalidDataException, ClusterStorageException, ObjectNotFoundException + { + if (eventId == NONE) + throw new InvalidDataException("No last eventId defined", ""); + + return (Event)Gateway.getStorage().get(sysKey, ClusterStorage.HISTORY+"/"+eventId, null); + } + + @Override + public String toString() { + return name; + } + +} -- cgit v1.2.3