diff options
Diffstat (limited to 'src/main/java/org/cristalise/kernel/persistency')
10 files changed, 2307 insertions, 0 deletions
diff --git a/src/main/java/org/cristalise/kernel/persistency/ClusterStorage.java b/src/main/java/org/cristalise/kernel/persistency/ClusterStorage.java new file mode 100644 index 0000000..ca43e67 --- /dev/null +++ b/src/main/java/org/cristalise/kernel/persistency/ClusterStorage.java @@ -0,0 +1,296 @@ +/**
+ * This file is part of the CRISTAL-iSE kernel.
+ * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved.
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; either version 3 of the License, or (at
+ * your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ * License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ *
+ * http://www.fsf.org/licensing/licenses/lgpl.html
+ */
+package org.cristalise.kernel.persistency;
+
+import org.cristalise.kernel.collection.Collection;
+import org.cristalise.kernel.common.PersistencyException;
+import org.cristalise.kernel.common.SystemKey;
+import org.cristalise.kernel.entity.C2KLocalObject;
+import org.cristalise.kernel.lookup.ItemPath;
+import org.cristalise.kernel.persistency.outcome.Outcome;
+import org.cristalise.kernel.persistency.outcome.Viewpoint;
+import org.cristalise.kernel.process.auth.Authenticator;
+import org.cristalise.kernel.utils.Logger;
+
+
+/**
+ * <p>Interface for persistency managers of entities. It allows different kernel
+ * objects to be stored in different backend. For instance, Properties may be
+ * stored in LDAP, while Events, Outcomes and Viewpoints could be stored in a
+ * relational database. The kernel does and needs no analytical querying of the
+ * ClusterStorages, only simple gets and puts. This may be implemented on top
+ * of the storage implementation separately.
+ *
+ * <p>Each item is indexed by its {@link ItemPath}, which is may be constructed from its
+ * UUID, equivalent {@link SystemKey} object, or
+ *
+ * <p>Each first-level path under the Item is defined as a Cluster. Different
+ * Clusters may be stored in different places. Each ClusterStorage must support
+ * {@link #get(ItemPath, String)} and
+ * {@link #getClusterContents(ItemPath, String)} for clusters they return
+ * {@link #READ} and {@link #READWRITE} from queryClusterSupport and
+ * {@link #put(ItemPath, C2KLocalObject)} and {@link #delete(ItemPath, String)}
+ * for clusters they return {@link #WRITE} and {@link #READWRITE} from
+ * {@link #getClusterContents(ItemPath, String)}. Operations that have not been
+ * declared as not supported should throw a PersistencyException. If a
+ * cluster does not exist, get should return null, and delete should return with
+ * no action.
+ */
+public abstract class ClusterStorage {
+
+ /**
+ * Constant to return from {@link #queryClusterSupport(String)} for Cluster
+ * types this storage does not support.
+ */
+ public static final short NONE = 0;
+ /**
+ * Constant to return from {@link #queryClusterSupport(String)} for Cluster
+ * types this storage can read from a database but not write. An example
+ * would be pre-existing data in a database that is mapped to Items in some
+ * way.
+ */
+ public static final short READ = 1;
+ /**
+ * Constant to return from {@link #queryClusterSupport(String)} for Cluster
+ * types this storage can write to a database but not read. An example would
+ * be a realtime database export of data, which is transformed in an
+ * unrecoverable way for use in other systems.
+ */
+ public static final short WRITE = 2;
+ /**
+ * Constant to return from {@link #queryClusterSupport(String)} for data
+ * stores that CRISTAL may use for both reading and writing for the given
+ * Cluster type.
+ */
+ public static final short READWRITE = 3;
+
+ // Cluster types
+ /**
+ * The defined path of the root of the CRISTAL Kernel object cluster tree. A
+ * zero-length string.
+ */
+ public static final String ROOT = "";
+ /**
+ * The root of the Property object cluster. All Property paths start with
+ * this. Defined as "Property". Properties are stored underneath according
+ * to their name e.g. "Property/Name"
+ */
+ public static final String PROPERTY = "Property";
+ /**
+ * The root of the Collection object cluster. All Collection paths start
+ * with this. Defined as "Collection". Collections are stored underneath by
+ * name e.g. "Collection/Composition"
+ */
+ public static final String COLLECTION = "Collection";
+ /**
+ * The cluster which holds the Item workflow. Defined as "LifeCycle". Holds
+ * the workflow inside, which is named "workflow", hence
+ * "LifeCycle/workflow".
+ *
+ * @see org.cristalise.kernel.lifecycle.instance.Workflow
+ */
+ public static final String LIFECYCLE = "LifeCycle";
+ /**
+ * This cluster holds all outcomes of this Item. The path to each outcome is
+ * "Outcome/<i>Schema Name</i>/<i>Schema Version</i>/<i>Event ID</i>"
+ */
+ public static final String OUTCOME = "Outcome";
+ /**
+ * This is the cluster that contains all event for this Item. This cluster
+ * may be instantiated in a client as a History, which is a RemoteMap.
+ * Events are stored with their ID: "/AuditTrail/<i>Event ID</i>"
+ */
+ public static final String HISTORY = "AuditTrail";
+ /**
+ * This cluster contains all viewpoints. Its name is defined as "ViewPoint".
+ * The paths of viewpoint objects stored here follow this pattern:
+ * "ViewPoint/<i>Schema Name</i>/<i>Viewpoint Name</i>"
+ */
+ public static final String VIEWPOINT = "ViewPoint";
+ /**
+ * Agents store their persistent jobs in this cluster that have been pushed
+ * to them by activities configured to do so. The name is defined as "Job"
+ * and each new job received is assigned an integer ID one more than the
+ * highest already present.
+ */
+ public static final String JOB = "Job";
+
+ /**
+ * An array of all currently supported cluster types, for iterative
+ * purposes.
+ */
+ public static final String[] allClusterTypes = { PROPERTY, COLLECTION,
+ LIFECYCLE, OUTCOME, HISTORY, VIEWPOINT, JOB };
+
+ /**
+ * Connects to the storage. It must be possible to retrieve CRISTAL local
+ * objects after this method returns.
+ *
+ * @param auth
+ * The Authenticator instance that the user or server logged in
+ * with.
+ * @throws PersistencyException
+ * If storage initialization failed
+ */
+ public abstract void open(Authenticator auth)
+ throws PersistencyException;
+
+ /**
+ * Shuts down the storage. Data must be completely written to disk before
+ * this method returns, so the process can exit. No further gets or puts
+ * should follow.
+ *
+ * @throws PersistencyException
+ * If closing failed
+ */
+ public abstract void close() throws PersistencyException;
+
+ /**
+ * Declares whether or not this ClusterStorage can read or write a
+ * particular CRISTAL local object type.
+ *
+ * @param clusterType
+ * The Cluster type requested. Must be one of the Cluster type
+ * constants from this class.
+ * @return A ClusterStorage constant: NONE, READ, WRITE, or READWRITE
+ */
+ public abstract short queryClusterSupport(String clusterType);
+
+ /**
+ * @return A full name of this storage for logging
+ */
+ public abstract String getName();
+
+ /**
+ * @return A short code for this storage for reference
+ */
+ public abstract String getId();
+
+ /**
+ * Utility method to find the cluster for a particular Local Object (the
+ * first part of its path)
+ *
+ * @param Local
+ * object path
+ * @return The cluster to which it belongs
+ */
+ protected static String getClusterType(String path) {
+ try {
+ if (path == null || path.length() == 0)
+ return ClusterStorage.ROOT;
+ int start = path.charAt(0) == '/' ? 1 : 0;
+ int end = path.indexOf('/', start + 1);
+ if (end == -1)
+ end = path.length();
+ return path.substring(start, end);
+ } catch (Exception ex) {
+ Logger.error(ex);
+ return ClusterStorage.ROOT;
+ }
+ }
+
+ /**
+ * Gives the path for a local object. Varies by Cluster.
+ *
+ * @param C2KLocalObject
+ * @return Its path
+ */
+ public static String getPath(C2KLocalObject obj) {
+ String root = obj.getClusterType();
+ if (root == null)
+ return null; // no storage allowed
+ if (obj instanceof Outcome) {
+ Outcome oc = (Outcome) obj;
+ return root + "/" + oc.getSchemaType() + "/"
+ + oc.getSchemaVersion() + "/" + oc.getName();
+ } else if (obj instanceof Viewpoint) {
+ Viewpoint vp = (Viewpoint) obj;
+ return root + "/" + vp.getSchemaName() + "/" + vp.getName();
+ } else if (obj instanceof Collection) {
+ Collection<?> coll = (Collection<?>) obj;
+ return root + "/" + coll.getName() + "/" +coll.getVersionName();
+ }
+ return root + "/" + obj.getName();
+ }
+
+ /* object manipulation */
+
+ // retrieve object by path
+ /**
+ * Fetches a CRISTAL local object from storage
+ *
+ * @param itemPath
+ * The ItemPath of the containing Item
+ * @param path
+ * The path of the local object
+ * @return The C2KLocalObject, or null if the object was not found
+ * @throws PersistencyException
+ * when retrieval failed
+ */
+ public abstract C2KLocalObject get(ItemPath itemPath, String path)
+ throws PersistencyException;
+
+ /**
+ * Stores a CRISTAL local object. The path is automatically generated.
+ *
+ * @param itemPath
+ * The Item that the object will be stored under
+ * @param obj
+ * The C2KLocalObject to store
+ * @throws PersistencyException
+ * When storage fails
+ */
+ public abstract void put(ItemPath itemPath, C2KLocalObject obj)
+ throws PersistencyException;
+
+ /**
+ * Remove a CRISTAL local object from storage. This should be used sparingly
+ * and responsibly, as it violated traceability. Objects removed in this way
+ * are not expected to be recoverable.
+ *
+ * @param itemPath
+ * The containing Item
+ * @param path
+ * The path of the object to be removed
+ * @throws PersistencyException
+ * When deletion fails or is not allowed
+ */
+ public abstract void delete(ItemPath itemPath, String path)
+ throws PersistencyException;
+
+ // directory listing
+ /**
+ * Queries the local path below the given root and returns the possible next
+ * elements.
+ *
+ * @param itemPath
+ * The Item to query
+ * @param path
+ * The path within that Item to query. May be ClusterStorage.ROOT
+ * (empty String)
+ * @return A String array of the possible next path elements
+ * @throws PersistencyException
+ * When an error occurred during the query
+ */
+ public abstract String[] getClusterContents(ItemPath itemPath, String path)
+ throws PersistencyException;
+
+}
diff --git a/src/main/java/org/cristalise/kernel/persistency/ClusterStorageManager.java b/src/main/java/org/cristalise/kernel/persistency/ClusterStorageManager.java new file mode 100644 index 0000000..46485b6 --- /dev/null +++ b/src/main/java/org/cristalise/kernel/persistency/ClusterStorageManager.java @@ -0,0 +1,413 @@ +/**
+ * This file is part of the CRISTAL-iSE kernel.
+ * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved.
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; either version 3 of the License, or (at
+ * your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ * License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ *
+ * http://www.fsf.org/licensing/licenses/lgpl.html
+ */
+package org.cristalise.kernel.persistency;
+
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import org.cristalise.kernel.common.ObjectNotFoundException;
+import org.cristalise.kernel.common.PersistencyException;
+import org.cristalise.kernel.entity.C2KLocalObject;
+import org.cristalise.kernel.entity.agent.JobList;
+import org.cristalise.kernel.entity.proxy.ProxyMessage;
+import org.cristalise.kernel.events.History;
+import org.cristalise.kernel.lookup.ItemPath;
+import org.cristalise.kernel.persistency.outcome.Outcome;
+import org.cristalise.kernel.persistency.outcome.Viewpoint;
+import org.cristalise.kernel.process.Gateway;
+import org.cristalise.kernel.process.auth.Authenticator;
+import org.cristalise.kernel.utils.Logger;
+import org.cristalise.kernel.utils.SoftCache;
+import org.cristalise.kernel.utils.WeakCache;
+
+
+/**
+ * instantiates ClusterStorages listed in properties file All read/write requests to storage pass through this object, which
+ * can query the capabilities of each declared storage, and channel requests accordingly. Transaction based.
+ *
+ * * @version $Revision: 1.62 $ $Date: 2006/02/01 13:27:46 $
+ * @author $Author: abranson $
+ */
+public class ClusterStorageManager {
+ HashMap<String, ClusterStorage> allStores = new HashMap<String, ClusterStorage>();
+ String[] clusterPriority = new String[0];
+ HashMap<String, ArrayList<ClusterStorage>> clusterWriters = new HashMap<String, ArrayList<ClusterStorage>>();
+ HashMap<String, ArrayList<ClusterStorage>> clusterReaders = new HashMap<String, ArrayList<ClusterStorage>>();
+ // we don't need a soft cache for the top level cache - the proxies and entities clear that when reaped
+ HashMap<ItemPath, Map<String, C2KLocalObject>> memoryCache = new HashMap<ItemPath, Map<String, C2KLocalObject>>();
+
+ /**
+ * Initialises all ClusterStorage handlers listed by class name in the property "ClusterStorages"
+ * This property is usually process specific, and so should be in the server/client.conf and not the connect file.
+ */
+ public ClusterStorageManager(Authenticator auth) throws PersistencyException {
+ Object clusterStorageProp = Gateway.getProperties().getObject("ClusterStorage");
+ if (clusterStorageProp == null || clusterStorageProp.equals("")) {
+ throw new PersistencyException("ClusterStorageManager.init() - no ClusterStorages defined. No persistency!");
+ }
+
+ ArrayList<ClusterStorage> rootStores;
+ if (clusterStorageProp instanceof String)
+ rootStores = instantiateStores((String)clusterStorageProp);
+ else if (clusterStorageProp instanceof ArrayList<?>) {
+ ArrayList<?> propStores = (ArrayList<?>)clusterStorageProp;
+ rootStores = new ArrayList<ClusterStorage>();
+ clusterPriority = new String[propStores.size()];
+ for (Object thisStore : propStores) {
+ if (thisStore instanceof ClusterStorage)
+ rootStores.add((ClusterStorage)thisStore);
+ else
+ throw new PersistencyException("Supplied ClusterStorage "+thisStore.toString()+" was not an instance of ClusterStorage");
+ }
+ }
+ else {
+ throw new PersistencyException("Unknown class of ClusterStorage property: "+clusterStorageProp.getClass().getName());
+ }
+
+ int clusterNo = 0;
+ for (ClusterStorage newStorage : rootStores) {
+ try {
+ newStorage.open(auth);
+ } catch (PersistencyException ex) {
+ Logger.error(ex);
+ throw new PersistencyException("ClusterStorageManager.init() - Error initialising storage handler " + newStorage.getClass().getName() +
+ ": " + ex.getMessage());
+ }
+ Logger.msg(5, "ClusterStorageManager.init() - Cluster storage " + newStorage.getClass().getName() +
+ " initialised successfully.");
+ allStores.put(newStorage.getId(), newStorage);
+ clusterPriority[clusterNo++] = newStorage.getId();
+ }
+ clusterReaders.put(ClusterStorage.ROOT, rootStores); // all storages are queried for clusters at the root level
+
+ }
+
+ public ArrayList<ClusterStorage> instantiateStores(String allClusters) throws PersistencyException {
+ ArrayList<ClusterStorage> rootStores = new ArrayList<ClusterStorage>();
+ StringTokenizer tok = new StringTokenizer(allClusters, ",");
+ clusterPriority = new String[tok.countTokens()];
+
+ while (tok.hasMoreTokens()) {
+ ClusterStorage newStorage = null;
+ String newStorageClass = tok.nextToken();
+ try {
+ try {
+ newStorage = (ClusterStorage)(Class.forName(newStorageClass).newInstance());
+ } catch (ClassNotFoundException ex2) {
+ newStorage = (ClusterStorage)(Class.forName("org.cristalise.storage."+newStorageClass).newInstance());
+ }
+ } catch (ClassNotFoundException ex) {
+ throw new PersistencyException("ClusterStorageManager.init() - The cluster storage handler class " + newStorageClass +
+ " could not be found.");
+ } catch (InstantiationException ex) {
+ throw new PersistencyException("ClusterStorageManager.init() - The cluster storage handler class " + newStorageClass +
+ " could not be instantiated.");
+ } catch (IllegalAccessException ex) {
+ throw new PersistencyException("ClusterStorageManager.init() - The cluster storage handler class " + newStorageClass +
+ " was not allowed to be instantiated.");
+ }
+ rootStores.add(newStorage);
+ }
+ return rootStores;
+ }
+
+ public void close() {
+ for (ClusterStorage thisStorage : allStores.values()) {
+ try {
+ thisStorage.close();
+ } catch (PersistencyException ex) {
+ Logger.error(ex);
+ }
+ }
+ }
+
+ /**
+ * Returns the loaded storage that declare that they can handle writing or reading the specified cluster name (e.g.
+ * Collection, Property) Must specify if the request is a read or a write.
+ */
+ private ArrayList<ClusterStorage> findStorages(String clusterType, boolean forWrite) {
+
+ // choose the right cache for readers or writers
+ HashMap<String, ArrayList<ClusterStorage>> cache;
+ if (forWrite)
+ cache = clusterWriters;
+ else
+ cache = clusterReaders;
+
+ // check to see if we've been asked to do this before
+ if (cache.containsKey(clusterType))
+ return cache.get(clusterType);
+
+ // not done yet, we'll have to query them all
+ Logger.msg(7, "ClusterStorageManager.findStorages() - finding storage for "+clusterType+" forWrite:"+forWrite);
+ ArrayList<ClusterStorage> useableStorages = new ArrayList<ClusterStorage>();
+ for (String element : clusterPriority) {
+ ClusterStorage thisStorage = allStores.get(element);
+ short requiredSupport = forWrite ? ClusterStorage.WRITE : ClusterStorage.READ;
+ if ((thisStorage.queryClusterSupport(clusterType) & requiredSupport) == requiredSupport) {
+ Logger.msg(7, "ClusterStorageManager.findStorages() - Got "+thisStorage.getName());
+ useableStorages.add(thisStorage);
+ }
+ }
+ cache.put(clusterType, useableStorages);
+ return useableStorages;
+ }
+
+ /**
+ * Retrieves the ids of the next level of a cluster
+ * Does not look in any currently open transactions.
+ */
+ public String[] getClusterContents(ItemPath itemPath, String path) throws PersistencyException {
+
+ ArrayList<String> contents = new ArrayList<String>();
+ // get all readers
+ Logger.msg(8, "ClusterStorageManager.getClusterContents() - Finding contents of "+path);
+ ArrayList<ClusterStorage> readers = findStorages(ClusterStorage.getClusterType(path), false);
+ // try each in turn until we get a result
+ for (ClusterStorage thisReader : readers) {
+ try {
+ String[] thisArr = thisReader.getClusterContents(itemPath, path);
+ if (thisArr != null) {
+ for (int j = 0; j < thisArr.length; j++)
+ if (!contents.contains(thisArr[j])) {
+ Logger.msg(9, "ClusterStorageManager.getClusterContents() - "+thisReader.getName()+" reports "+thisArr[j]);
+ contents.add(thisArr[j]);
+ }
+ }
+ } catch (PersistencyException e) {
+ Logger.msg(5, "ClusterStorageManager.getClusterContents() - reader " + thisReader.getName() +
+ " could not retrieve contents of " + itemPath + "/" + path + ": " + e.getMessage());
+ }
+ }
+
+ String[] retArr = new String[0];
+ retArr = contents.toArray(retArr);
+ return retArr;
+ }
+
+ /** Internal get method. Retrieves clusters from ClusterStorages & maintains the memory cache */
+ public C2KLocalObject get(ItemPath itemPath, String path) throws PersistencyException, ObjectNotFoundException {
+ C2KLocalObject result = null;
+ // check cache first
+ Map<String, C2KLocalObject> sysKeyMemCache = null;
+ sysKeyMemCache = memoryCache.get(itemPath);
+ if (sysKeyMemCache != null) {
+ synchronized(sysKeyMemCache) {
+ C2KLocalObject obj = sysKeyMemCache.get(path);
+ if (obj != null) {
+ Logger.msg(7, "ClusterStorageManager.get() - found "+itemPath+"/"+path+" in memcache");
+ return obj;
+ }
+ }
+ }
+
+ // special case - loading viewpoint contents
+ if (path.startsWith(ClusterStorage.VIEWPOINT) &&
+ path.endsWith("/data")) {
+ StringTokenizer tok = new StringTokenizer(path,"/");
+ if (tok.countTokens() == 4) { // to not catch viewpoints called 'data'
+ Outcome data = null;
+ Viewpoint view = (Viewpoint)get(itemPath, path.substring(0, path.lastIndexOf("/")));
+ if (view != null)
+ data = view.getOutcome();
+ return data;
+ }
+ }
+
+ // deal out top level remote maps
+ if (path.indexOf('/') == -1) {
+ if (path.equals(ClusterStorage.HISTORY))
+ result = new History(itemPath, null);
+ if (path.equals(ClusterStorage.JOB))
+ result = new JobList(itemPath, null);
+ if (result!=null) {
+ synchronized(sysKeyMemCache) {
+ sysKeyMemCache.put(path, result);
+ }
+ return result;
+ }
+
+ }
+
+ // else try each reader in turn until we find it
+ ArrayList<ClusterStorage> readers = findStorages(ClusterStorage.getClusterType(path), false);
+ for (ClusterStorage thisReader : readers) {
+ try {
+ result = thisReader.get(itemPath, path);
+ Logger.msg(7, "ClusterStorageManager.get() - reading "+path+" from "+thisReader.getName() + " for item " + itemPath);
+ if (result != null) { // got it!
+ // store it in the cache
+ if (sysKeyMemCache == null) { // create cache if needed
+ boolean useWeak = Gateway.getProperties().getBoolean("Storage.useWeakCache", false);
+ Logger.msg(7,"ClusterStorageManager.put() - Creating "+(useWeak?"Weak":"Strong")+" cache for item "+itemPath);
+ sysKeyMemCache = useWeak?new WeakCache<String, C2KLocalObject>():new SoftCache<String, C2KLocalObject>(0);
+ synchronized (memoryCache) {
+ memoryCache.put(itemPath, sysKeyMemCache);
+ }
+ }
+ synchronized(sysKeyMemCache) {
+ sysKeyMemCache.put(path, result);
+ }
+ // then return it
+ return result;
+ }
+ } catch (PersistencyException e) {
+ Logger.msg(7, "ClusterStorageManager.get() - reader " + thisReader.getName() + " could not retrieve " + itemPath +
+ "/" + path + ": " + e.getMessage());
+ }
+ }
+ throw new ObjectNotFoundException("ClusterStorageManager.get() - Path " + path + " not found in " + itemPath);
+ }
+
+ /** Internal put method. Creates or overwrites a cluster in all writers. Used when committing transactions. */
+ public void put(ItemPath itemPath, C2KLocalObject obj) throws PersistencyException {
+ String path = ClusterStorage.getPath(obj);
+ ArrayList<ClusterStorage> writers = findStorages(ClusterStorage.getClusterType(path), true);
+ for (ClusterStorage thisWriter : writers) {
+ try {
+ Logger.msg(7, "ClusterStorageManager.put() - writing "+path+" to "+thisWriter.getName());
+ thisWriter.put(itemPath, obj);
+ } catch (PersistencyException e) {
+ Logger.error("ClusterStorageManager.put() - writer " + thisWriter.getName() + " could not store " +
+ itemPath + "/" + path + ": " + e.getMessage());
+ throw e;
+ }
+ }
+ // put in mem cache if that worked
+ Map<String, C2KLocalObject> sysKeyMemCache;
+ if (memoryCache.containsKey(itemPath))
+ sysKeyMemCache = memoryCache.get(itemPath);
+ else {
+ boolean useWeak = Gateway.getProperties().getBoolean("Storage.useWeakCache", false);
+ Logger.msg(7,"ClusterStorageManager.put() - Creating "+(useWeak?"Weak":"Strong")+" cache for entity "+itemPath);
+ sysKeyMemCache = useWeak?new WeakCache<String, C2KLocalObject>():new SoftCache<String, C2KLocalObject>(0);
+ synchronized (memoryCache) {
+ memoryCache.put(itemPath, sysKeyMemCache);
+ }
+ }
+
+ synchronized(sysKeyMemCache) {
+ sysKeyMemCache.put(path, obj);
+ }
+
+ if (Logger.doLog(9)) dumpCacheContents(9);
+
+ // transmit proxy event
+ Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(itemPath, path, ProxyMessage.ADDED));
+ }
+
+ /** Deletes a cluster from all writers */
+ public void remove(ItemPath itemPath, String path) throws PersistencyException {
+ ArrayList<ClusterStorage> writers = findStorages(ClusterStorage.getClusterType(path), true);
+ for (ClusterStorage thisWriter : writers) {
+ try {
+ Logger.msg(7, "ClusterStorageManager.delete() - removing "+path+" from "+thisWriter.getName());
+ thisWriter.delete(itemPath, path);
+ } catch (PersistencyException e) {
+ Logger.error("ClusterStorageManager.delete() - writer " + thisWriter.getName() + " could not delete " + itemPath +
+ "/" + path + ": " + e.getMessage());
+ throw e;
+ }
+ }
+
+ if (memoryCache.containsKey(itemPath)) {
+ Map<String, C2KLocalObject> itemMemCache = memoryCache.get(itemPath);
+ synchronized (itemMemCache) {
+ itemMemCache.remove(path);
+ }
+ }
+
+
+ // transmit proxy event
+ Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(itemPath, path, ProxyMessage.DELETED));
+ }
+
+ public void clearCache(ItemPath itemPath, String path) {
+ Logger.msg(7, "CSM.clearCache() - removing "+itemPath+"/"+path);
+
+ if (memoryCache.containsKey(itemPath)) {
+ Map<String, C2KLocalObject> sysKeyMemCache = memoryCache.get(itemPath);
+ synchronized(sysKeyMemCache) {
+ for (Iterator<String> iter = sysKeyMemCache.keySet().iterator(); iter.hasNext();) {
+ String thisPath = iter.next();
+ if (thisPath.startsWith(path)) {
+ Logger.msg(7, "CSM.clearCache() - removing "+itemPath+"/"+thisPath);
+ iter.remove();
+ }
+ }
+ }
+ }
+ }
+
+ public void clearCache(ItemPath itemPath) {
+
+ Logger.msg(5, "CSM.clearCache() - removing entire cache of "+itemPath);
+
+ if (memoryCache.containsKey(itemPath)) {
+ synchronized (memoryCache) {
+ if (Logger.doLog(6)) {
+ Map<String, C2KLocalObject> sysKeyMemCache = memoryCache.get(itemPath);
+ int size = sysKeyMemCache.size();
+ Logger.msg(6, "CSM.clearCache() - "+size+" objects to remove.");
+ }
+ memoryCache.remove(itemPath);
+ }
+ }
+ else
+ Logger.msg(6, "CSM.clearCache() - No objects cached");
+ }
+
+ public void clearCache() {
+ synchronized (memoryCache) {
+ memoryCache.clear();
+ }
+ Logger.msg(5, "CSM.clearCache() - cleared entire cache, "+memoryCache.size()+" entities.");
+ }
+
+ public void dumpCacheContents(int logLevel) {
+ if (!Logger.doLog(logLevel)) return;
+ synchronized(memoryCache) {
+ for (ItemPath itemPath : memoryCache.keySet()) {
+ Logger.msg(logLevel, "Cached Objects of Entity "+itemPath);
+ Map<String, C2KLocalObject> sysKeyMemCache = memoryCache.get(itemPath);
+ try {
+ synchronized(sysKeyMemCache) {
+ for (Object name : sysKeyMemCache.keySet()) {
+ String path = (String) name;
+ try {
+ Logger.msg(logLevel, " Path "+path+": "+sysKeyMemCache.get(path).getClass().getName());
+ } catch (NullPointerException e) {
+ Logger.msg(logLevel, " Path "+path+": reaped");
+ }
+ }
+ }
+ } catch (ConcurrentModificationException ex) {
+ Logger.msg(logLevel, "Cache modified - aborting");
+ }
+ }
+ Logger.msg(logLevel, "Total number of cached entities: "+memoryCache.size());
+ }
+ }
+}
diff --git a/src/main/java/org/cristalise/kernel/persistency/RemoteMap.java b/src/main/java/org/cristalise/kernel/persistency/RemoteMap.java new file mode 100644 index 0000000..deb8f0b --- /dev/null +++ b/src/main/java/org/cristalise/kernel/persistency/RemoteMap.java @@ -0,0 +1,398 @@ +/**
+ * This file is part of the CRISTAL-iSE kernel.
+ * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved.
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; either version 3 of the License, or (at
+ * your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ * License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ *
+ * http://www.fsf.org/licensing/licenses/lgpl.html
+ */
+package org.cristalise.kernel.persistency;
+
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.cristalise.kernel.common.ObjectNotFoundException;
+import org.cristalise.kernel.common.PersistencyException;
+import org.cristalise.kernel.entity.C2KLocalObject;
+import org.cristalise.kernel.entity.proxy.ItemProxy;
+import org.cristalise.kernel.entity.proxy.MemberSubscription;
+import org.cristalise.kernel.entity.proxy.ProxyObserver;
+import org.cristalise.kernel.lookup.ItemPath;
+import org.cristalise.kernel.process.Gateway;
+import org.cristalise.kernel.utils.Logger;
+
+
+/**
+ * Maps a storage cluster onto a java.util.Map
+ *
+ * @author Andrew Branson
+ * $Revision: 1.22 $
+ * $Date: 2006/03/03 13:52:21 $
+ *
+ * Copyright (C) 2003 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ */
+public class RemoteMap<V extends C2KLocalObject> extends TreeMap<String, V> implements C2KLocalObject {
+
+ private int mID=-1;
+ private String mName;
+ protected ItemPath mItemPath;
+ private String mPath = "";
+ Object keyLock = null;
+ TransactionManager storage;
+ ProxyObserver<V> listener;
+ Comparator<String> comp;
+ ItemProxy source;
+ Object mLocker; // if this remote map will participate in a transaction
+
+ public RemoteMap(ItemPath itemPath, String path, Object locker) {
+
+ super(new Comparator<String>() {
+ @Override
+ public int compare(String o1, String o2) {
+ Integer i1 = null, i2 = null;
+ try {
+ i1 = Integer.valueOf(o1);
+ i2 = Integer.valueOf(o2);
+ return i1.compareTo(i2);
+ } catch (NumberFormatException ex) { }
+ return o1.compareTo(o2);
+ }
+ });
+
+ mItemPath = itemPath;
+ mLocker = locker;
+
+ // split the path into path/name
+ int lastSlash = path.lastIndexOf("/");
+ mName = path.substring(lastSlash+1);
+ if (lastSlash>0) mPath = path.substring(0,lastSlash);
+
+ // see if the name is also a suitable id
+ try {
+ mID = Integer.parseInt(mName);
+ } catch (NumberFormatException e) {}
+ storage = Gateway.getStorage();
+
+ listener = new ProxyObserver<V>() {
+ @Override
+ public void add(V obj) {
+ synchronized (this) {
+ putLocal(obj.getName(), obj);
+ }
+ }
+
+ @Override
+ public void remove(String id) {
+ synchronized (this) {
+ removeLocal(id);
+ }
+ }
+
+ @Override
+ public void control(String control, String msg) { }
+ };
+
+ try {
+ source = Gateway.getProxyManager().getProxy(mItemPath);
+ source.subscribe(new MemberSubscription<V>(listener, path, false));
+ } catch (Exception ex) {
+ Logger.error("Error subscribing to remote map. Changes will not be received");
+ Logger.error(ex);
+ }
+ }
+
+ protected void loadKeys() {
+ if (keyLock != null) return;
+ clear();
+ keyLock = new Object();
+ synchronized(this) {
+ String[] keys;
+ try {
+ keys = storage.getClusterContents(mItemPath, mPath+mName);
+ for (String key : keys) super.put(key, null);
+ } catch (PersistencyException e) {
+ Logger.error(e);
+ }
+
+ }
+ }
+
+ public synchronized int getLastId() {
+ loadKeys();
+ if (size() == 0) return -1;
+ try {
+ return Integer.parseInt(lastKey());
+ } catch (NumberFormatException ex) {
+ return -1;
+ }
+ }
+
+
+ // c2kLocalObject methods
+ public void setID(int id) { mID = id; }
+
+ public int getID() { return mID; }
+
+ @Override
+ public void setName(String name) { mName = name; }
+
+ @Override
+ public String getName() { return mName; }
+
+ /**
+ * Cannot be stored
+ */
+ @Override
+ public String getClusterType() {
+ return null;
+ }
+ /**
+ * @see java.util.Map#clear()
+ */
+ @Override
+ public synchronized void clear() {
+ synchronized (this) {
+ super.clear();
+ }
+ keyLock = null;
+ }
+
+
+
+ /**
+ * @see java.util.Map#containsKey(Object)
+ */
+ @Override
+ public synchronized boolean containsKey(Object key) {
+ if (keyLock == null) loadKeys();
+ return super.containsKey(key);
+ }
+
+ /**
+ * This must retrieve all the values until a match is made.
+ * Very expensive, but if you must, you must.
+ * @see java.util.Map#containsValue(Object)
+ */
+ @Override
+ public synchronized boolean containsValue(Object value) {
+ loadKeys();
+ synchronized(this) {
+ for (String key: keySet()) {
+ if (get(key).equals(value)) return true;
+ }
+ }
+ return false;
+ }
+
+
+ /**
+ * @see java.util.Map#get(Object)
+ */
+ @Override
+ public synchronized V get(Object objKey) {
+ loadKeys();
+ String key;
+ if (objKey instanceof Integer)
+ key = ((Integer)objKey).toString();
+ else if (objKey instanceof String)
+ key = (String)objKey;
+ else
+ return null;
+
+ synchronized(this) {
+ try {
+ V value = super.get(key);
+ if (value == null) {
+ value = (V)storage.get(mItemPath, mPath+mName+"/"+key, mLocker);
+ super.put(key, value);
+ }
+ return value;
+ } catch (PersistencyException e) {
+ Logger.error(e);
+ } catch (ObjectNotFoundException e) {
+ Logger.error(e);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * @see java.util.Map#isEmpty()
+ */
+ @Override
+ public synchronized boolean isEmpty() {
+ loadKeys();
+ return super.isEmpty();
+ }
+
+ /**
+ * @see java.util.Map#keySet()
+ */
+ @Override
+ public synchronized Set<String> keySet() {
+ loadKeys();
+ return super.keySet();
+ }
+
+ /**
+ * Inserts the given object into the storage
+ * the key is ignored - it can be fetched from the value.
+ * @see java.util.Map#put(Object, Object)
+ */
+ @Override
+ public synchronized V put(String key, V value) {
+ try {
+ synchronized(this) {
+ storage.put(mItemPath, value, mLocker);
+ return putLocal(key, value);
+ }
+ } catch (PersistencyException e) {
+ Logger.error(e);
+ return null;
+ }
+ }
+
+ protected synchronized V putLocal(String key, V value) {
+ return super.put(key, value);
+ }
+
+ /**
+ * @see java.util.Map#remove(Object)
+ */
+ @Override
+ public synchronized V remove(Object key) {
+ loadKeys();
+ if (containsKey(key)) try {
+ synchronized(keyLock) {
+ storage.remove(mItemPath, mPath+mName+"/"+key, mLocker);
+ return super.remove(key);
+ }
+ } catch (PersistencyException e) {
+ Logger.error(e);
+ }
+ return null;
+ }
+
+ protected synchronized V removeLocal(Object key) {
+ return super.remove(key);
+ }
+
+ /**
+ * @see java.util.Map#size()
+ */
+ @Override
+ public synchronized int size() {
+ loadKeys();
+ return super.size();
+ }
+
+ /**
+ * @see java.util.Map#values()
+ */
+ @Override
+ public synchronized Collection<V> values() {
+ return new RemoteSet<V>(this);
+ }
+
+ /**
+ * Basic implementation of Set and Collection to bridge to the Iterator
+ * Disallows all writes.
+ */
+
+ private class RemoteSet<E extends C2KLocalObject> extends AbstractSet<E> {
+ RemoteMap<E> mParent;
+
+ public RemoteSet(RemoteMap<E> parent) {
+ mParent = parent;
+ }
+
+ // no modifications allowed
+ @Override
+ public boolean add(E o) {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public boolean addAll(Collection<? extends E> c) {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public void clear() {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public boolean remove(Object o) {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ return new RemoteIterator<E>(mParent);
+ }
+
+ @Override
+ public int size() {
+ return mParent.size();
+ }
+
+ }
+ /**
+ * Iterator view on RemoteMap data. Doesn't preload anything.
+ * REVISIT: Will go strange if the RemoteMap is modified. Detect this and throw ConcurrentMod ex
+ */
+ private class RemoteIterator<C extends C2KLocalObject> implements Iterator<C> {
+ RemoteMap<C> mParent;
+ Iterator<String> iter;
+ String currentKey;
+
+ public RemoteIterator(RemoteMap<C> parent) {
+ mParent = parent;
+ iter = mParent.keySet().iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public C next() {
+ currentKey = iter.next();
+ return mParent.get(currentKey);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+
+
+
+}
diff --git a/src/main/java/org/cristalise/kernel/persistency/TransactionManager.java b/src/main/java/org/cristalise/kernel/persistency/TransactionManager.java new file mode 100644 index 0000000..7712da6 --- /dev/null +++ b/src/main/java/org/cristalise/kernel/persistency/TransactionManager.java @@ -0,0 +1,353 @@ +/**
+ * This file is part of the CRISTAL-iSE kernel.
+ * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved.
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; either version 3 of the License, or (at
+ * your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ * License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ *
+ * http://www.fsf.org/licensing/licenses/lgpl.html
+ */
+package org.cristalise.kernel.persistency;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.cristalise.kernel.common.ObjectNotFoundException;
+import org.cristalise.kernel.common.PersistencyException;
+import org.cristalise.kernel.entity.C2KLocalObject;
+import org.cristalise.kernel.entity.agent.JobList;
+import org.cristalise.kernel.events.History;
+import org.cristalise.kernel.lookup.ItemPath;
+import org.cristalise.kernel.process.auth.Authenticator;
+import org.cristalise.kernel.utils.Logger;
+
+
+public class TransactionManager {
+
+ HashMap<ItemPath, Object> locks;
+ HashMap<Object, ArrayList<TransactionEntry>> pendingTransactions;
+ ClusterStorageManager storage;
+
+ public TransactionManager(Authenticator auth) throws PersistencyException {
+ storage = new ClusterStorageManager(auth);
+ locks = new HashMap<ItemPath, Object>();
+ pendingTransactions = new HashMap<Object, ArrayList<TransactionEntry>>();
+ }
+
+ public boolean hasPendingTransactions()
+ {
+ return pendingTransactions.size() > 0;
+ }
+
+ public ClusterStorageManager getDb() {
+ return storage;
+ }
+
+ public void close() {
+ if (pendingTransactions.size() != 0) {
+ Logger.error("There were pending transactions on shutdown. All changes were lost.");
+ dumpPendingTransactions(0);
+ }
+ Logger.msg("Transaction Manager: Closing storages");
+ storage.close();
+ }
+
+ public String[] getClusterContents(ItemPath itemPath, String path) throws PersistencyException {
+ if (path.startsWith("/") && path.length() > 1) path = path.substring(1);
+ return storage.getClusterContents(itemPath, path);
+ }
+
+ /**
+ * Public get method. Required a 'locker' object for a transaction key.
+ * Checks the transaction table first to see if the caller has uncommitted changes
+ */
+ public C2KLocalObject get(ItemPath itemPath, String path, Object locker)
+ throws PersistencyException,
+ ObjectNotFoundException {
+ if (path.startsWith("/") && path.length() > 1) path = path.substring(1);
+
+ // deal out top level remote maps, if transactions aren't needed
+ if (path.indexOf('/') == -1) {
+ if (path.equals(ClusterStorage.HISTORY) && locker != null)
+ return new History(itemPath, locker);
+ if (path.equals(ClusterStorage.JOB) && locker != null)
+ return new JobList(itemPath, locker);
+ }
+
+ // check to see if the locker has been modifying this cluster
+ if (locks.containsKey(itemPath) && locks.get(itemPath).equals(locker)) {
+ ArrayList<TransactionEntry> lockerTransaction = pendingTransactions.get(locker);
+ for (TransactionEntry thisEntry : lockerTransaction) {
+ if (itemPath.equals(thisEntry.itemPath) && path.equals(thisEntry.path)) {
+ if (thisEntry.obj == null)
+ throw new PersistencyException("ClusterStorageManager.get() - Cluster " + path + " has been deleted in " + itemPath +
+ " but not yet committed");
+ return thisEntry.obj;
+ }
+ }
+ }
+ return storage.get(itemPath, path);
+ }
+
+ /**
+ * Public put method. Manages the transaction table keyed by the object 'locker'.
+ * If this object is null, transaction support is bypassed (so long as no lock exists on that object).
+ */
+ public void put(ItemPath itemPath, C2KLocalObject obj, Object locker) throws PersistencyException {
+ Object tempLocker = null;
+ ArrayList<TransactionEntry> lockerTransaction;
+
+ synchronized(locks) {
+ // look to see if this object is already locked
+ if (locks.containsKey(itemPath)) {
+ // if it's this locker, get the transaction list
+ Object thisLocker = locks.get(itemPath);
+ if (thisLocker.equals(locker)) // retrieve the transaction list
+ lockerTransaction = pendingTransactions.get(locker);
+ else // locked by someone else
+ throw new PersistencyException("ClusterStorageManager.get() - Access denied: Object " + itemPath +
+ " has been locked for writing by " + thisLocker);
+ }
+ else { // no locks for this item
+ if (locker == null) { // lock the item until the non-transactional put is complete :/
+ tempLocker = new Object();
+ locks.put(itemPath, tempLocker);
+ lockerTransaction = null;
+ }
+ else { // initialise the transaction
+ locks.put(itemPath, locker);
+ lockerTransaction = new ArrayList<TransactionEntry>();
+ pendingTransactions.put(locker, lockerTransaction);
+ }
+ }
+ }
+
+ if (tempLocker != null) { // non-locking put/delete
+ storage.put(itemPath, obj);
+ locks.remove(itemPath);
+ return;
+ }
+
+ // create the new entry in the transaction table
+ TransactionEntry newEntry = new TransactionEntry(itemPath, obj);
+ /* equals() in TransactionEntry only compares sysKey and path, so we can use
+ * contains() in ArrayList to looks for preexisting entries for this cluster
+ * and overwrite them.
+ */
+ if (lockerTransaction.contains(newEntry))
+ lockerTransaction.remove(newEntry);
+ lockerTransaction.add(newEntry);
+ }
+
+ /** Public delete method. Uses the put method, with null as the object value.
+ */
+ public void remove(ItemPath itemPath, String path, Object locker) throws PersistencyException {
+ ArrayList<TransactionEntry> lockerTransaction;
+ Object tempLocker = null;
+ synchronized(locks) {
+ // look to see if this object is already locked
+ if (locks.containsKey(itemPath)) {
+ // if it's this locker, get the transaction list
+ Object thisLocker = locks.get(itemPath);
+ if (thisLocker.equals(locker)) // retrieve the transaction list
+ lockerTransaction = pendingTransactions.get(locker);
+ else // locked by someone else
+ throw new PersistencyException("ClusterStorageManager.get() - Access denied: Object " + itemPath +
+ " has been locked for writing by " + thisLocker);
+ }
+ else { // either we are the locker, or there is no locker
+ if (locker == null) { // non-locking put/delete
+ tempLocker = new Object();
+ locks.put(itemPath, tempLocker);
+ lockerTransaction = null;
+ }
+ else {// initialise the transaction
+ locks.put(itemPath, locker);
+ lockerTransaction = new ArrayList<TransactionEntry>();
+ pendingTransactions.put(locker, lockerTransaction);
+ }
+ }
+ }
+
+ if (tempLocker != null) {
+ storage.remove(itemPath, path);
+ locks.remove(itemPath);
+ return;
+ }
+
+ // create the new entry in the transaction table
+ TransactionEntry newEntry = new TransactionEntry(itemPath, path);
+ /* equals() in TransactionEntry only compares sysKey and path, so we can use
+ * contains() in ArrayList to looks for preexisting entries for this cluster
+ * and overwrite them.
+ */
+ if (lockerTransaction.contains(newEntry))
+ lockerTransaction.remove(newEntry);
+ lockerTransaction.add(newEntry);
+
+ }
+
+ /**
+ * Removes all child objects from the given path
+ *
+ * @param sysKey - entity to delete from
+ * @param path - root path to delete
+ * @param locker - locking object
+ *
+ * @throws PersistencyException - when deleting fails
+ */
+ public void removeCluster(ItemPath itemPath, String path, Object locker) throws PersistencyException {
+
+ String[] children = getClusterContents(itemPath, path);
+ for (String element : children)
+ removeCluster(itemPath, path+(path.length()>0?"/":"")+element, locker);
+ if (children.length==0 && path.indexOf("/") > -1)
+ remove(itemPath, path, locker);
+
+ }
+ /**
+ * Writes all pending changes to the backends.
+ */
+ public void commit(Object locker) {
+ synchronized(locks) {
+ ArrayList<TransactionEntry> lockerTransactions = pendingTransactions.get(locker);
+ HashMap<TransactionEntry, Exception> exceptions = new HashMap<TransactionEntry, Exception>();
+ // quit if no transactions are present;
+ if (lockerTransactions == null) return;
+ for (TransactionEntry thisEntry : lockerTransactions) {
+ try {
+ if (thisEntry.obj == null)
+ storage.remove(thisEntry.itemPath, thisEntry.path);
+ else
+ storage.put(thisEntry.itemPath, thisEntry.obj);
+ locks.remove(thisEntry.itemPath);
+ } catch (Exception e) {
+ exceptions.put(thisEntry, e);
+ }
+ }
+ pendingTransactions.remove(locker);
+ if (exceptions.size() > 0) { // oh dear
+ Logger.error("TransactionManager.commit() - Problems during transaction commit of locker "+locker.toString()+". Database may be in an inconsistent state.");
+ for (TransactionEntry entry : exceptions.keySet()) {
+ Exception ex = exceptions.get(entry);
+ Logger.msg(entry.toString());
+ Logger.error(ex);
+ }
+ dumpPendingTransactions(0);
+ Logger.die("Database failure");
+ }
+
+ }
+ }
+
+ /**
+ * Rolls back all changes sent in the name of 'locker' and unlocks the sysKeys
+ */
+ public void abort(Object locker) {
+ synchronized(locks) {
+ if (locks.containsValue(locker)) {
+ for (ItemPath thisPath : locks.keySet()) {
+ if (locks.get(thisPath).equals(locker))
+ locks.remove(thisPath);
+ }
+ }
+ pendingTransactions.remove(locker);
+ }
+ }
+
+ public void clearCache(ItemPath itemPath, String path) {
+ if (itemPath == null)
+ storage.clearCache();
+ else if (path == null)
+ storage.clearCache(itemPath);
+ else
+ storage.clearCache(itemPath, path);
+
+ }
+
+ public void dumpPendingTransactions(int logLevel) {
+ Logger.msg(logLevel, "================");
+ Logger.msg(logLevel, "Transaction dump");
+ Logger.msg(logLevel, "Locked Items:");
+ if (locks.size() == 0)
+ Logger.msg(logLevel, " None");
+ else
+ for (ItemPath thisPath : locks.keySet()) {
+ Object locker = locks.get(thisPath);
+ Logger.msg(logLevel, " "+thisPath+" locked by "+locker);
+ }
+
+ Logger.msg(logLevel, "Open transactions:");
+ if (pendingTransactions.size() == 0)
+ Logger.msg(logLevel, " None");
+ else
+ for (Object thisLocker : pendingTransactions.keySet()) {
+ Logger.msg(logLevel, " Transaction owner:"+thisLocker);
+ ArrayList<TransactionEntry> entries = pendingTransactions.get(thisLocker);
+ for (TransactionEntry thisEntry : entries) {
+ Logger.msg(logLevel, " "+thisEntry.toString());
+ }
+ }
+ }
+
+ /** Used in the transaction table to store details of a put until commit
+ */
+ static class TransactionEntry {
+ public ItemPath itemPath;
+ public String path;
+ public C2KLocalObject obj;
+ public TransactionEntry(ItemPath itemPath, C2KLocalObject obj) {
+ this.itemPath = itemPath;
+ this.path = ClusterStorage.getPath(obj);
+ this.obj = obj;
+ }
+
+ public TransactionEntry(ItemPath itemPath, String path) {
+ this.itemPath = itemPath;
+ this.path = path;
+ this.obj = null;
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer report = new StringBuffer();
+ if (obj == null)
+ report.append("Delete");
+ else
+ report.append("Put "+obj.getClass().getName());
+ report.append(" at ").append(path).append(" in ").append(itemPath);
+ return report.toString();
+
+ }
+ /**
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ return itemPath.hashCode()*path.hashCode();
+ }
+
+ /**
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof TransactionEntry)
+ return hashCode() == ((TransactionEntry)other).hashCode();
+ return false;
+ }
+
+ }
+
+}
diff --git a/src/main/java/org/cristalise/kernel/persistency/outcome/Outcome.java b/src/main/java/org/cristalise/kernel/persistency/outcome/Outcome.java new file mode 100644 index 0000000..95f3e6b --- /dev/null +++ b/src/main/java/org/cristalise/kernel/persistency/outcome/Outcome.java @@ -0,0 +1,290 @@ +/**
+ * This file is part of the CRISTAL-iSE kernel.
+ * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved.
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; either version 3 of the License, or (at
+ * your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ * License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ *
+ * http://www.fsf.org/licensing/licenses/lgpl.html
+ */
+package org.cristalise.kernel.persistency.outcome;
+import java.io.StringReader;
+import java.util.StringTokenizer;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+
+import org.cristalise.kernel.common.InvalidDataException;
+import org.cristalise.kernel.common.ObjectNotFoundException;
+import org.cristalise.kernel.common.PersistencyException;
+import org.cristalise.kernel.entity.C2KLocalObject;
+import org.cristalise.kernel.persistency.ClusterStorage;
+import org.cristalise.kernel.utils.LocalObjectLoader;
+import org.cristalise.kernel.utils.Logger;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Text;
+import org.w3c.dom.bootstrap.DOMImplementationRegistry;
+import org.w3c.dom.ls.DOMImplementationLS;
+import org.w3c.dom.ls.LSSerializer;
+import org.xml.sax.InputSource;
+
+
+public class Outcome implements C2KLocalObject {
+ Integer mID;
+ String mData;
+ String mSchemaType;
+ int mSchemaVersion;
+ Document dom;
+ static DocumentBuilder parser;
+ static DOMImplementationLS impl;
+ static XPath xpath;
+
+ static {
+ // Set up parser
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+ dbf.setValidating(false);
+ dbf.setNamespaceAware(false);
+ try {
+ parser = dbf.newDocumentBuilder();
+ } catch (ParserConfigurationException e) {
+ Logger.error(e);
+ Logger.die("Cannot function without XML parser");
+ }
+
+ // Set up serialiser
+ try {
+ DOMImplementationRegistry registry = DOMImplementationRegistry.newInstance();
+ impl = (DOMImplementationLS)registry.getDOMImplementation("LS");
+ } catch (Exception e) {
+ Logger.error(e);
+ Logger.die("Cannot function without XML serialiser");
+ }
+
+ XPathFactory xPathFactory = XPathFactory.newInstance();
+ xpath = xPathFactory.newXPath();
+ }
+
+ //id is the eventID
+ public Outcome(int id, String data, String schemaType, int schemaVersion) {
+ mID = id;
+ mData = data;
+ mSchemaType = schemaType;
+ mSchemaVersion = schemaVersion;
+ }
+
+ public Outcome(String path, String data) throws PersistencyException {
+ // derive all the meta data from the path
+ StringTokenizer tok = new StringTokenizer(path,"/");
+ if (tok.countTokens() != 3 && !(tok.nextToken().equals(ClusterStorage.OUTCOME)))
+ throw new PersistencyException("Outcome() - Outcome path must have three components: "+path);
+ mSchemaType = tok.nextToken();
+ String verstring = tok.nextToken();
+ String objId = tok.nextToken();
+ try {
+ mSchemaVersion = Integer.parseInt(verstring);
+ } catch (NumberFormatException ex) {
+ throw new PersistencyException("Outcome() - Outcome version was an invalid number: "+verstring);
+ }
+ try {
+ mID = Integer.valueOf(objId);
+ } catch (NumberFormatException ex) {
+ mID = null;
+ }
+ mData = data;
+ }
+
+ public void setID(Integer ID) {
+ mID = ID;
+ }
+
+ public Integer getID() {
+ return mID;
+ }
+
+ @Override
+ public void setName(String name) {
+ try {
+ mID = Integer.valueOf(name);
+ } catch (NumberFormatException e) {
+ Logger.error("Invalid id set on Outcome:"+name);
+ }
+ }
+
+ @Override
+ public String getName() {
+ return mID.toString();
+ }
+
+ public void setData(String data) {
+ mData = data;
+ dom = null;
+ }
+
+ public void setData(Document data) {
+ dom = data;
+ mData = null;
+ }
+
+ public String getFieldByXPath(String xpath) throws XPathExpressionException, InvalidDataException {
+ Node field = getNodeByXPath(xpath);
+ if (field == null)
+ throw new InvalidDataException(xpath);
+
+ else if (field.getNodeType()==Node.TEXT_NODE || field.getNodeType()==Node.CDATA_SECTION_NODE)
+ return field.getNodeValue();
+
+ else if (field.getNodeType()==Node.ELEMENT_NODE) {
+ NodeList fieldChildren = field.getChildNodes();
+ if (fieldChildren.getLength() == 0)
+ throw new InvalidDataException("No child node for element");
+
+ else if (fieldChildren.getLength() == 1) {
+ Node child = fieldChildren.item(0);
+ if (child.getNodeType()==Node.TEXT_NODE || child.getNodeType()==Node.CDATA_SECTION_NODE)
+ return child.getNodeValue();
+ else
+ throw new InvalidDataException("Can't get data from child node of type "+child.getNodeName());
+ }
+ else
+ throw new InvalidDataException("Element "+xpath+" has too many children");
+ }
+ else if (field.getNodeType()==Node.ATTRIBUTE_NODE)
+ return field.getNodeValue();
+ else
+ throw new InvalidDataException("Don't know what to do with node "+field.getNodeName());
+ }
+
+ public void setFieldByXPath(String xpath, String data) throws XPathExpressionException, InvalidDataException {
+ Node field = getNodeByXPath(xpath);
+ if (field == null)
+ throw new InvalidDataException(xpath);
+
+ else if (field.getNodeType()==Node.ELEMENT_NODE) {
+ NodeList fieldChildren = field.getChildNodes();
+ if (fieldChildren.getLength() == 0) {
+ field.appendChild(dom.createTextNode(data));
+ }
+ else if (fieldChildren.getLength() == 1) {
+ Node child = fieldChildren.item(0);
+ switch (child.getNodeType()) {
+ case Node.TEXT_NODE:
+ case Node.CDATA_SECTION_NODE:
+ child.setNodeValue(data);
+ break;
+ default:
+ throw new InvalidDataException("Can't set child node of type "+child.getNodeName());
+ }
+ }
+ else
+ throw new InvalidDataException("Element "+xpath+" has too many children");
+ }
+ else if (field.getNodeType()==Node.ATTRIBUTE_NODE)
+ field.setNodeValue(data);
+ else
+ throw new InvalidDataException("Don't know what to do with node "+field.getNodeName());
+ }
+
+
+ public String getData() {
+ if (mData == null && dom != null) {
+ mData = serialize(dom, false);
+ }
+ return mData;
+ }
+
+ public Schema getSchema() throws ObjectNotFoundException {
+ return LocalObjectLoader.getSchema(mSchemaType, mSchemaVersion);
+ }
+
+ public void setSchemaType(String schemaType) {
+ mSchemaType = schemaType;
+ }
+
+ public String getSchemaType() {
+ return mSchemaType;
+ }
+
+ public int getSchemaVersion() {
+ return mSchemaVersion;
+ }
+
+ public void setSchemaVersion(int schVer) {
+ mSchemaVersion = schVer;
+ }
+
+ @Override
+ public String getClusterType() {
+ return ClusterStorage.OUTCOME;
+ }
+
+ // special script API methods
+
+ /**
+ * Parses the outcome into a DOM tree
+ * @return a DOM Document
+ */
+ public Document getDOM() {
+ if (dom == null)
+ try {
+ synchronized (parser) {
+ if (mData!=null)
+ dom = parser.parse(new InputSource(new StringReader(mData)));
+ else
+ dom = parser.newDocument();
+ }
+ } catch (Exception e) {
+ Logger.error(e);
+ return null;
+ }
+ return dom;
+ }
+
+ public String getField(String name) {
+ NodeList elements = getDOM().getDocumentElement().getElementsByTagName(name);
+ if (elements.getLength() == 1 && elements.item(0).hasChildNodes() && elements.item(0).getFirstChild() instanceof Text)
+ return ((Text)elements.item(0).getFirstChild()).getData();
+ else
+ return null;
+ }
+
+ public NodeList getNodesByXPath(String xpathExpr) throws XPathExpressionException {
+
+ XPathExpression expr = xpath.compile(xpathExpr);
+ return (NodeList)expr.evaluate(getDOM(), XPathConstants.NODESET);
+
+ }
+
+ public Node getNodeByXPath(String xpathExpr) throws XPathExpressionException {
+
+ XPathExpression expr = xpath.compile(xpathExpr);
+ return (Node)expr.evaluate(getDOM(), XPathConstants.NODE);
+
+ }
+
+ static public String serialize(Document doc, boolean prettyPrint)
+ {
+ LSSerializer writer = impl.createLSSerializer();
+ writer.getDomConfig().setParameter("format-pretty-print", prettyPrint);
+ writer.getDomConfig().setParameter("xml-declaration", false);
+ return writer.writeToString(doc);
+ }
+}
diff --git a/src/main/java/org/cristalise/kernel/persistency/outcome/OutcomeInitiator.java b/src/main/java/org/cristalise/kernel/persistency/outcome/OutcomeInitiator.java new file mode 100644 index 0000000..0a22fd0 --- /dev/null +++ b/src/main/java/org/cristalise/kernel/persistency/outcome/OutcomeInitiator.java @@ -0,0 +1,31 @@ +/**
+ * This file is part of the CRISTAL-iSE kernel.
+ * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved.
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; either version 3 of the License, or (at
+ * your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ * License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ *
+ * http://www.fsf.org/licensing/licenses/lgpl.html
+ */
+package org.cristalise.kernel.persistency.outcome;
+
+import org.cristalise.kernel.common.InvalidDataException;
+import org.cristalise.kernel.entity.agent.Job;
+
+
+public interface OutcomeInitiator {
+
+ public String initOutcome(Job job) throws InvalidDataException;
+
+}
diff --git a/src/main/java/org/cristalise/kernel/persistency/outcome/OutcomeValidator.java b/src/main/java/org/cristalise/kernel/persistency/outcome/OutcomeValidator.java new file mode 100644 index 0000000..fed0c8c --- /dev/null +++ b/src/main/java/org/cristalise/kernel/persistency/outcome/OutcomeValidator.java @@ -0,0 +1,207 @@ +/**
+ * This file is part of the CRISTAL-iSE kernel.
+ * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved.
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; either version 3 of the License, or (at
+ * your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ * License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ *
+ * http://www.fsf.org/licensing/licenses/lgpl.html
+ */
+package org.cristalise.kernel.persistency.outcome;
+
+import java.io.IOException;
+import java.io.StringReader;
+
+import org.apache.xerces.parsers.DOMParser;
+import org.apache.xerces.parsers.IntegratedParserConfiguration;
+import org.apache.xerces.parsers.XMLGrammarPreparser;
+import org.apache.xerces.util.SymbolTable;
+import org.apache.xerces.util.XMLGrammarPoolImpl;
+import org.apache.xerces.xni.XNIException;
+import org.apache.xerces.xni.grammars.XMLGrammarDescription;
+import org.apache.xerces.xni.parser.XMLErrorHandler;
+import org.apache.xerces.xni.parser.XMLInputSource;
+import org.apache.xerces.xni.parser.XMLParseException;
+import org.apache.xerces.xni.parser.XMLParserConfiguration;
+import org.cristalise.kernel.common.InvalidDataException;
+import org.cristalise.kernel.utils.Logger;
+import org.xml.sax.ErrorHandler;
+import org.xml.sax.SAXException;
+import org.xml.sax.SAXParseException;
+
+
+/**************************************************************************
+ *
+ * $Revision: 1.24 $
+ * $Date: 2005/06/09 13:50:10 $
+ *
+ * Copyright (C) 2003 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+
+
+public class OutcomeValidator implements ErrorHandler, XMLErrorHandler {
+
+ protected static final String NAMESPACES_FEATURE_ID = "http://xml.org/sax/features/namespaces";
+ /** Validation feature id (http://xml.org/sax/features/validation). */
+ protected static final String VALIDATION_FEATURE_ID = "http://xml.org/sax/features/validation";
+ /** Schema validation feature id (http://apache.org/xml/features/validation/schema). */
+ protected static final String SCHEMA_VALIDATION_FEATURE_ID = "http://apache.org/xml/features/validation/schema";
+ /** Schema full checking feature id (http://apache.org/xml/features/validation/schema-full-checking). */
+ protected static final String SCHEMA_FULL_CHECKING_FEATURE_ID = "http://apache.org/xml/features/validation/schema-full-checking";
+ public static final String GRAMMAR_POOL = "http://apache.org/xml/properties/internal/grammar-pool";
+
+ static SchemaValidator schemaValid = new SchemaValidator();
+
+ Schema schema;
+ protected StringBuffer errors = null;
+ XMLGrammarPoolImpl schemaGrammarPool = new XMLGrammarPoolImpl(1);
+ SymbolTable sym = new SymbolTable();
+
+ public static OutcomeValidator getValidator(Schema schema) throws InvalidDataException {
+
+ if (schema.docType.equals("Schema") &&
+ schema.docVersion==0)
+ return schemaValid;
+
+ return new OutcomeValidator(schema);
+ }
+
+ protected OutcomeValidator() {
+ errors = new StringBuffer();
+ }
+
+ public OutcomeValidator(Schema schema) throws InvalidDataException {
+ this.schema = schema;
+
+ if (schema.docType.equals("Schema"))
+ throw new InvalidDataException("Use SchemaValidator to validate schema");
+
+ errors = new StringBuffer();
+ Logger.msg(5, "Parsing "+schema.docType+" version "+schema.docVersion+". "+schema.schema.length()+" chars");
+
+ XMLGrammarPreparser preparser = new XMLGrammarPreparser(sym);
+ preparser.registerPreparser(XMLGrammarDescription.XML_SCHEMA, null);
+ preparser.setProperty(GRAMMAR_POOL, schemaGrammarPool);
+
+ preparser.setFeature(NAMESPACES_FEATURE_ID, true);
+ preparser.setFeature(VALIDATION_FEATURE_ID, true);
+ preparser.setFeature(SCHEMA_VALIDATION_FEATURE_ID, true);
+ preparser.setFeature(SCHEMA_FULL_CHECKING_FEATURE_ID, true);
+ preparser.setErrorHandler(this);
+ try {
+ preparser.preparseGrammar(XMLGrammarDescription.XML_SCHEMA, new XMLInputSource(null, null, null, new StringReader(schema.schema), null));
+ } catch (IOException ex) {
+ throw new InvalidDataException("Error parsing schema: "+ex.getMessage());
+ }
+
+ if (errors.length() > 0) {
+ throw new InvalidDataException("Schema error: \n"+errors.toString());
+ }
+
+ }
+
+ public synchronized String validate(Outcome outcome) {
+ if (outcome == null) return "Outcome object was null";
+ Logger.msg(5, "Validating outcome no "+outcome.getID()+" as "+schema.docType+" v"+schema.docVersion);
+ if (outcome.getSchemaType().equals(schema.docType)
+ && outcome.getSchemaVersion() == schema.docVersion) {
+ return validate(outcome.getData());
+ }
+ else
+ return "Outcome type and version did not match schema "+schema.docType;
+ }
+
+ public synchronized String validate(String outcome) {
+ if (outcome == null) return "Outcome String was null";
+ errors = new StringBuffer();
+ try {
+ XMLParserConfiguration parserConfiguration = new IntegratedParserConfiguration(sym, schemaGrammarPool);
+ parserConfiguration.setFeature(NAMESPACES_FEATURE_ID, true);
+ parserConfiguration.setFeature(VALIDATION_FEATURE_ID, true);
+ // now we can still do schema features just in case,
+ // so long as it's our configuraiton......
+ parserConfiguration.setFeature(SCHEMA_VALIDATION_FEATURE_ID, true);
+ parserConfiguration.setFeature(SCHEMA_FULL_CHECKING_FEATURE_ID, true);
+ DOMParser parser = new DOMParser(parserConfiguration);
+ parser.setErrorHandler(this);
+
+ parser.parse(new XMLInputSource(null, null, null, new StringReader(outcome), null));
+ } catch (Exception e) {
+ return e.getMessage();
+ }
+ return errors.toString();
+ }
+
+ private void appendError(String level, Exception ex) {
+ errors.append(level);
+ String message = ex.getMessage();
+ if (message == null || message.length()==0)
+ message = ex.getClass().getName();
+ errors.append(message);
+ errors.append("\n");
+ }
+
+ /**
+ * ErrorHandler for instances
+ */
+ @Override
+ public void error(SAXParseException ex) throws SAXException {
+ appendError("ERROR: ", ex);
+ }
+
+ /**
+ *
+ */
+ @Override
+ public void fatalError(SAXParseException ex) throws SAXException {
+ appendError("FATAL: ", ex);
+ }
+
+ /**
+ *
+ */
+ @Override
+ public void warning(SAXParseException ex) throws SAXException {
+ appendError("WARNING: ", ex);
+ }
+
+ /**
+ * XMLErrorHandler for schema
+ */
+ @Override
+ public void error(String domain, String key, XMLParseException ex)
+ throws XNIException {
+ appendError("ERROR: ", ex);
+ }
+
+ /**
+ *
+ */
+ @Override
+ public void fatalError(String domain, String key, XMLParseException ex)
+ throws XNIException {
+ appendError("FATAL: ", ex);
+ }
+
+ /**
+ *
+ */
+ @Override
+ public void warning(String domain, String key, XMLParseException ex)
+ throws XNIException {
+ appendError("WARNING: ", ex);
+ }
+
+}
diff --git a/src/main/java/org/cristalise/kernel/persistency/outcome/Schema.java b/src/main/java/org/cristalise/kernel/persistency/outcome/Schema.java new file mode 100644 index 0000000..4ae6ef3 --- /dev/null +++ b/src/main/java/org/cristalise/kernel/persistency/outcome/Schema.java @@ -0,0 +1,73 @@ +/**
+ * This file is part of the CRISTAL-iSE kernel.
+ * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved.
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; either version 3 of the License, or (at
+ * your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ * License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ *
+ * http://www.fsf.org/licensing/licenses/lgpl.html
+ */
+package org.cristalise.kernel.persistency.outcome;
+
+import java.io.IOException;
+import java.io.StringReader;
+
+import org.exolab.castor.xml.schema.reader.SchemaReader;
+import org.xml.sax.ErrorHandler;
+import org.xml.sax.InputSource;
+
+/**
+ * @author Andrew Branson
+ *
+ * $Revision: 1.3 $
+ * $Date: 2006/09/14 14:13:26 $
+ *
+ * Copyright (C) 2003 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ */
+
+public class Schema {
+ public String docType;
+ public int docVersion;
+ public String schema;
+ public org.exolab.castor.xml.schema.Schema som;
+
+ /**
+ * @param docType
+ * @param docVersion
+ * @param schema
+ */
+ public Schema(String docType, int docVersion, String schema) {
+ super();
+ this.docType = docType;
+ this.docVersion = docVersion;
+ this.schema = schema;
+ }
+
+ public Schema(String schema) {
+ this.schema = schema;
+ }
+
+ public org.exolab.castor.xml.schema.Schema parse(ErrorHandler errorHandler) throws IOException {
+ InputSource schemaSource = new InputSource(new StringReader(schema));
+ SchemaReader mySchemaReader = new SchemaReader(schemaSource);
+ if (errorHandler!= null) {
+ mySchemaReader.setErrorHandler(errorHandler);
+ mySchemaReader.setValidation(true);
+ }
+ som = mySchemaReader.read();
+ return som;
+ }
+
+}
diff --git a/src/main/java/org/cristalise/kernel/persistency/outcome/SchemaValidator.java b/src/main/java/org/cristalise/kernel/persistency/outcome/SchemaValidator.java new file mode 100644 index 0000000..b9684d2 --- /dev/null +++ b/src/main/java/org/cristalise/kernel/persistency/outcome/SchemaValidator.java @@ -0,0 +1,55 @@ +/**
+ * This file is part of the CRISTAL-iSE kernel.
+ * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved.
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; either version 3 of the License, or (at
+ * your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ * License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ *
+ * http://www.fsf.org/licensing/licenses/lgpl.html
+ */
+package org.cristalise.kernel.persistency.outcome;
+
+import java.io.IOException;
+
+
+/**************************************************************************
+ *
+ * $Revision: 1.2 $
+ * $Date: 2005/04/26 06:48:13 $
+ *
+ * Copyright (C) 2003 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+
+
+
+public class SchemaValidator extends OutcomeValidator {
+
+ public SchemaValidator() {
+
+ }
+
+ @Override
+ public synchronized String validate(String outcome) {
+ errors = new StringBuffer();
+ Schema schema = new Schema(outcome);
+ try {
+ schema.parse(this);
+ } catch (IOException e) {
+ errors.append(e.getMessage());
+ }
+ return errors.toString();
+ }
+
+}
diff --git a/src/main/java/org/cristalise/kernel/persistency/outcome/Viewpoint.java b/src/main/java/org/cristalise/kernel/persistency/outcome/Viewpoint.java new file mode 100644 index 0000000..630975a --- /dev/null +++ b/src/main/java/org/cristalise/kernel/persistency/outcome/Viewpoint.java @@ -0,0 +1,191 @@ +/**
+ * This file is part of the CRISTAL-iSE kernel.
+ * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved.
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; either version 3 of the License, or (at
+ * your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ * License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ *
+ * http://www.fsf.org/licensing/licenses/lgpl.html
+ */
+package org.cristalise.kernel.persistency.outcome;
+
+import org.cristalise.kernel.common.InvalidDataException;
+import org.cristalise.kernel.common.ObjectNotFoundException;
+import org.cristalise.kernel.common.PersistencyException;
+import org.cristalise.kernel.entity.C2KLocalObject;
+import org.cristalise.kernel.events.Event;
+import org.cristalise.kernel.lookup.InvalidItemPathException;
+import org.cristalise.kernel.lookup.ItemPath;
+import org.cristalise.kernel.persistency.ClusterStorage;
+import org.cristalise.kernel.process.Gateway;
+
+
+/**
+ * @author Andrew Branson
+ *
+ * $Revision: 1.10 $
+ * $Date: 2005/10/05 07:39:36 $
+ *
+ * Copyright (C) 2003 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ */
+
+public class Viewpoint implements C2KLocalObject {
+
+ // db fields
+ ItemPath itemPath;
+ String schemaName;
+ String name;
+ int schemaVersion;
+ int eventId;
+ public static final int NONE = -1;
+
+ public Viewpoint() {
+ eventId = NONE;
+ itemPath = null;
+ schemaVersion = NONE;
+ schemaName = null;
+ name = null;
+ }
+
+ public Viewpoint(ItemPath itemPath, String schemaName, String name, int schemaVersion, int eventId) {
+ this.itemPath = itemPath;
+ this.schemaName = schemaName;
+ this.name = name;
+ this.schemaVersion = schemaVersion;
+ this.eventId = eventId;
+ }
+
+ public Outcome getOutcome() throws ObjectNotFoundException, PersistencyException {
+ if (eventId == NONE) throw new ObjectNotFoundException("No last eventId defined");
+ Outcome retVal = (Outcome)Gateway.getStorage().get(itemPath, ClusterStorage.OUTCOME+"/"+schemaName+"/"+schemaVersion+"/"+eventId, null);
+ return retVal;
+ }
+
+ @Override
+ public String getClusterType() {
+ return ClusterStorage.VIEWPOINT;
+ }
+
+
+ /**
+ * Returns the eventId.
+ * @return int
+ */
+ public int getEventId() {
+ return eventId;
+ }
+
+ /**
+ * Returns the name.
+ * @return String
+ */
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Returns the schemaName.
+ * @return String
+ */
+ public String getSchemaName() {
+ return schemaName;
+ }
+
+ /**
+ * Returns the schemaVersion.
+ * @return int
+ */
+ public int getSchemaVersion() {
+ return schemaVersion;
+ }
+
+ /**
+ * Returns the sysKey.
+ * @return int
+ */
+ public ItemPath getItemPath() {
+ return itemPath;
+ }
+
+ /**
+ * Sets the eventId.
+ * @param eventId The eventId to set
+ */
+ public void setEventId(int eventId) {
+ this.eventId = eventId;
+ }
+
+ /**
+ * Sets the name.
+ * @param name The name to set
+ */
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Sets the schemaName.
+ * @param schemaName The schemaName to set
+ */
+ public void setSchemaName(String schemaName) {
+ this.schemaName = schemaName;
+ }
+
+ /**
+ * Sets the schemaVersion.
+ * @param schemaVersion The schemaVersion to set
+ */
+ public void setSchemaVersion(int schemaVersion) {
+ this.schemaVersion = schemaVersion;
+ }
+
+ /**
+ * Sets the sysKey.
+ * @param sysKey The sysKey to set
+ */
+ public void setItemPath(ItemPath itemPath) {
+ this.itemPath = itemPath;
+ }
+
+ public void setItemUUID( String uuid ) throws InvalidItemPathException
+ {
+ setItemPath(new ItemPath(uuid));
+ }
+
+ public String getItemUUID() {
+ return getItemPath().getUUID().toString();
+ }
+
+ /**
+ * Method getEvent.
+ * @return GDataRecord
+ */
+ public Event getEvent()
+ throws InvalidDataException, PersistencyException, ObjectNotFoundException
+ {
+ if (eventId == NONE)
+ throw new InvalidDataException("No last eventId defined");
+
+ return (Event)Gateway.getStorage().get(itemPath, ClusterStorage.HISTORY+"/"+eventId, null);
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+}
|
