diff options
| author | ogattaz <olivier@gattaz.com> | 2014-06-05 16:51:07 +0200 |
|---|---|---|
| committer | ogattaz <olivier@gattaz.com> | 2014-06-05 16:51:07 +0200 |
| commit | 2fd193d7936084de91eae46e8c2763914d87ab71 (patch) | |
| tree | b136ed97e535f11d4b3433d16c26570c89430ce4 /src/main/java/com/c2kernel/persistency | |
| parent | 1225792532f77e6e8f4a9addfc0c0a6cf56e89b8 (diff) | |
| parent | e73468fd08cc27aa31f76a27c916e45d5987c628 (diff) | |
Merge branch 'master' of ssh://dev.cccs.uwe.ac.uk:22/var/git/cristal-kernel
Diffstat (limited to 'src/main/java/com/c2kernel/persistency')
11 files changed, 276 insertions, 99 deletions
diff --git a/src/main/java/com/c2kernel/persistency/ClusterStorage.java b/src/main/java/com/c2kernel/persistency/ClusterStorage.java index 9c18bb4..76aaf1e 100644 --- a/src/main/java/com/c2kernel/persistency/ClusterStorage.java +++ b/src/main/java/com/c2kernel/persistency/ClusterStorage.java @@ -3,6 +3,7 @@ package com.c2kernel.persistency; import com.c2kernel.entity.C2KLocalObject;
import com.c2kernel.persistency.outcome.Outcome;
import com.c2kernel.persistency.outcome.Viewpoint;
+import com.c2kernel.process.auth.Authenticator;
import com.c2kernel.utils.Logger;
/** Interface for persistency managers of entities. It allows different kernel objects to be stored in different backend. For instance,
@@ -85,7 +86,7 @@ public abstract class ClusterStorage { public static final String[] allClusterTypes = { PROPERTY, COLLECTION, LIFECYCLE, OUTCOME, HISTORY, VIEWPOINT, JOB };
// connection maintenance
- public abstract void open()
+ public abstract void open(Authenticator auth)
throws ClusterStorageException;
public abstract void close()
throws ClusterStorageException;
diff --git a/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java b/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java index 402c466..c9ede04 100644 --- a/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java +++ b/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java @@ -10,12 +10,12 @@ import java.util.StringTokenizer; import com.c2kernel.common.ObjectNotFoundException;
import com.c2kernel.entity.C2KLocalObject;
import com.c2kernel.entity.agent.JobList;
-import com.c2kernel.entity.proxy.EntityProxyManager;
import com.c2kernel.entity.proxy.ProxyMessage;
import com.c2kernel.events.History;
import com.c2kernel.persistency.outcome.Outcome;
import com.c2kernel.persistency.outcome.Viewpoint;
import com.c2kernel.process.Gateway;
+import com.c2kernel.process.auth.Authenticator;
import com.c2kernel.utils.Logger;
import com.c2kernel.utils.SoftCache;
import com.c2kernel.utils.WeakCache;
@@ -39,7 +39,7 @@ public class ClusterStorageManager { * Initialises all ClusterStorage handlers listed by class name in the property "ClusterStorages"
* This property is usually process specific, and so should be in the server/client.conf and not the connect file.
*/
- public ClusterStorageManager() throws ClusterStorageException {
+ public ClusterStorageManager(Authenticator auth) throws ClusterStorageException {
Object clusterStorageProp = Gateway.getProperties().getObject("ClusterStorage");
if (clusterStorageProp == null || clusterStorageProp.equals("")) {
throw new ClusterStorageException("ClusterStorageManager.init() - no ClusterStorages defined. No persistency!");
@@ -65,7 +65,7 @@ public class ClusterStorageManager { int clusterNo = 0;
for (ClusterStorage newStorage : rootStores) {
try {
- newStorage.open();
+ newStorage.open(auth);
} catch (ClusterStorageException ex) {
Logger.error(ex);
throw new ClusterStorageException("ClusterStorageManager.init() - Error initialising storage handler " + newStorage.getClass().getName() +
@@ -173,7 +173,7 @@ public class ClusterStorageManager { }
}
} catch (ClusterStorageException e) {
- Logger.error("ClusterStorageManager.getClusterContents() - reader " + thisReader.getName() +
+ Logger.msg(5, "ClusterStorageManager.getClusterContents() - reader " + thisReader.getName() +
" could not retrieve contents of " + sysKey + "/" + path + ": " + e.getMessage());
}
}
@@ -291,7 +291,7 @@ public class ClusterStorageManager { if (Logger.doLog(9)) dumpCacheContents(9);
// transmit proxy event
- EntityProxyManager.sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.ADDED));
+ Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.ADDED));
}
/** Deletes a cluster from all writers */
@@ -317,7 +317,7 @@ public class ClusterStorageManager { // transmit proxy event
- EntityProxyManager.sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.DELETED));
+ Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.DELETED));
}
public void clearCache(Integer sysKeyIntObj, String path) {
diff --git a/src/main/java/com/c2kernel/persistency/LDAPClusterStorage.java b/src/main/java/com/c2kernel/persistency/LDAPClusterStorage.java index 5a305f9..4762a33 100644 --- a/src/main/java/com/c2kernel/persistency/LDAPClusterStorage.java +++ b/src/main/java/com/c2kernel/persistency/LDAPClusterStorage.java @@ -4,10 +4,13 @@ import java.util.StringTokenizer; import com.c2kernel.common.ObjectNotFoundException;
import com.c2kernel.entity.C2KLocalObject;
-import com.c2kernel.lookup.EntityPath;
-import com.c2kernel.lookup.InvalidEntityPathException;
-import com.c2kernel.lookup.LDAPPropertyManager;
+import com.c2kernel.lookup.InvalidItemPathException;
+import com.c2kernel.lookup.ItemPath;
+import com.c2kernel.lookup.Lookup;
+import com.c2kernel.lookup.ldap.LDAPLookup;
+import com.c2kernel.lookup.ldap.LDAPPropertyManager;
import com.c2kernel.process.Gateway;
+import com.c2kernel.process.auth.Authenticator;
import com.c2kernel.property.Property;
import com.c2kernel.utils.Logger;
@@ -15,8 +18,12 @@ public class LDAPClusterStorage extends ClusterStorage { LDAPPropertyManager ldapStore;
@Override
- public void open() throws ClusterStorageException {
- ldapStore = Gateway.getLDAPLookup().getPropManager();
+ public void open(Authenticator auth) throws ClusterStorageException {
+ Lookup lookup = Gateway.getLookup();
+ if (lookup instanceof LDAPLookup)
+ ldapStore = ((LDAPLookup)lookup).getPropManager();
+ else
+ throw new ClusterStorageException("Cannot use LDAP cluster storage without LDAP Lookup");
}
@@ -53,10 +60,10 @@ public class LDAPClusterStorage extends ClusterStorage { throw new ClusterStorageException("Path length was invalid: "+path);
String type = tok.nextToken();
- EntityPath thisEntity;
+ ItemPath thisEntity;
try {
- thisEntity = new EntityPath(sysKey.intValue());
- } catch (InvalidEntityPathException e) {
+ thisEntity = new ItemPath(sysKey.intValue());
+ } catch (InvalidItemPathException e) {
throw new ClusterStorageException("Invalid Syskey:"+sysKey);
}
@@ -84,10 +91,10 @@ public class LDAPClusterStorage extends ClusterStorage { String type = obj.getClusterType();
- EntityPath thisEntity;
+ ItemPath thisEntity;
try {
- thisEntity = new EntityPath(sysKey.intValue());
- } catch (InvalidEntityPathException e) {
+ thisEntity = new ItemPath(sysKey.intValue());
+ } catch (InvalidItemPathException e) {
throw new ClusterStorageException("Invalid Syskey:"+sysKey);
}
@@ -112,10 +119,10 @@ public class LDAPClusterStorage extends ClusterStorage { throw new ClusterStorageException("Path length was invalid: "+path);
String type = tok.nextToken();
- EntityPath thisEntity;
+ ItemPath thisEntity;
try {
- thisEntity = new EntityPath(sysKey.intValue());
- } catch (InvalidEntityPathException e) {
+ thisEntity = new ItemPath(sysKey.intValue());
+ } catch (InvalidItemPathException e) {
throw new ClusterStorageException("Invalid Syskey:"+sysKey);
}
@@ -146,7 +153,7 @@ public class LDAPClusterStorage extends ClusterStorage { String type = getClusterType(path);
try
{
- EntityPath thisEntity = new EntityPath(sysKey.intValue());
+ ItemPath thisEntity = new ItemPath(sysKey.intValue());
if (type.equals(PROPERTY))
return ldapStore.getPropertyNames(thisEntity);
else
@@ -160,7 +167,7 @@ public class LDAPClusterStorage extends ClusterStorage { }
else
throw new ClusterStorageException("Cluster type "+type+" not supported.");
- } catch (InvalidEntityPathException e) {
+ } catch (InvalidItemPathException e) {
throw new ClusterStorageException("Invalid Syskey:"+sysKey);
} catch (ObjectNotFoundException e) {
throw new ClusterStorageException("Entity "+sysKey+" does not exist");
diff --git a/src/main/java/com/c2kernel/persistency/MemoryOnlyClusterStorage.java b/src/main/java/com/c2kernel/persistency/MemoryOnlyClusterStorage.java index a895c32..cd5d122 100644 --- a/src/main/java/com/c2kernel/persistency/MemoryOnlyClusterStorage.java +++ b/src/main/java/com/c2kernel/persistency/MemoryOnlyClusterStorage.java @@ -1,10 +1,13 @@ package com.c2kernel.persistency;
import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Map;
import com.c2kernel.entity.C2KLocalObject;
+import com.c2kernel.process.auth.Authenticator;
+import com.c2kernel.utils.Logger;
public class MemoryOnlyClusterStorage extends ClusterStorage {
@@ -17,7 +20,7 @@ public class MemoryOnlyClusterStorage extends ClusterStorage { }
@Override
- public void open() throws ClusterStorageException {
+ public void open(Authenticator auth) throws ClusterStorageException {
}
@@ -96,18 +99,44 @@ public class MemoryOnlyClusterStorage extends ClusterStorage { Map<String, C2KLocalObject> sysKeyMemCache = memoryCache.get(sysKey);
ArrayList<String> result = new ArrayList<String>();
if (sysKeyMemCache != null) {
- if (path.endsWith("/"))
+ while (path.endsWith("/"))
path = path.substring(0,path.length()-1);
path = path+"/";
for (String thisPath : sysKeyMemCache.keySet()) {
if (thisPath.startsWith(path)) {
- int slash = path.indexOf('/');
- String suffix = slash>-1?path.substring(slash+1):path;
+ String end = thisPath.substring(path.length());
+ int slash = end.indexOf('/');
+ String suffix = slash>-1?end.substring(0, slash):end;
if (!result.contains(suffix)) result.add(suffix);
}
}
}
return result.toArray(new String[result.size()]);
}
-
+
+ public void dumpContents(int sysKey) {
+ synchronized(memoryCache) {
+ Logger.msg(0, "Cached Objects of Entity "+sysKey);
+ Map<String, C2KLocalObject> sysKeyMemCache = memoryCache.get(sysKey);
+ if (sysKeyMemCache == null) {
+ Logger.msg(0, "No cache found");
+ return;
+ }
+ try {
+ synchronized(sysKeyMemCache) {
+ for (Object name : sysKeyMemCache.keySet()) {
+ String path = (String) name;
+ try {
+ Logger.msg(0, " Path "+path+": "+sysKeyMemCache.get(path).getClass().getName());
+ } catch (NullPointerException e) {
+ Logger.msg(0, " Path "+path+": reaped");
+ }
+ }
+ }
+ } catch (ConcurrentModificationException ex) {
+ Logger.msg(0, "Cache modified - aborting");
+ }
+ }
+ Logger.msg(0, "Total number of cached entities: "+memoryCache.size());
+ }
}
diff --git a/src/main/java/com/c2kernel/persistency/NextKeyManager.java b/src/main/java/com/c2kernel/persistency/NextKeyManager.java new file mode 100644 index 0000000..e0d0013 --- /dev/null +++ b/src/main/java/com/c2kernel/persistency/NextKeyManager.java @@ -0,0 +1,19 @@ +package com.c2kernel.persistency;
+
+import com.c2kernel.common.ObjectCannotBeUpdated;
+import com.c2kernel.common.ObjectNotFoundException;
+import com.c2kernel.lookup.AgentPath;
+import com.c2kernel.lookup.ItemPath;
+
+public interface NextKeyManager {
+
+ public ItemPath generateNextEntityKey()
+ throws ObjectCannotBeUpdated, ObjectNotFoundException;
+
+ public AgentPath generateNextAgentKey()
+ throws ObjectCannotBeUpdated, ObjectNotFoundException;
+
+ public void writeLastEntityKey(int sysKey) throws ObjectCannotBeUpdated, ObjectNotFoundException;
+
+ public ItemPath getLastEntityPath() throws ObjectNotFoundException;
+}
diff --git a/src/main/java/com/c2kernel/persistency/ProxyLoader.java b/src/main/java/com/c2kernel/persistency/ProxyLoader.java index a2f7141..57b91af 100644 --- a/src/main/java/com/c2kernel/persistency/ProxyLoader.java +++ b/src/main/java/com/c2kernel/persistency/ProxyLoader.java @@ -2,14 +2,16 @@ package com.c2kernel.persistency; import java.util.HashMap;
import java.util.StringTokenizer;
+import com.c2kernel.common.ObjectNotFoundException;
import com.c2kernel.entity.AgentHelper;
import com.c2kernel.entity.C2KLocalObject;
+import com.c2kernel.entity.Item;
import com.c2kernel.entity.ItemHelper;
-import com.c2kernel.entity.ManageableEntity;
-import com.c2kernel.lookup.EntityPath;
-import com.c2kernel.lookup.LDAPLookup;
+import com.c2kernel.lookup.ItemPath;
+import com.c2kernel.lookup.Lookup;
import com.c2kernel.persistency.outcome.Outcome;
import com.c2kernel.process.Gateway;
+import com.c2kernel.process.auth.Authenticator;
import com.c2kernel.utils.Logger;
/** Used by proxies to load clusters by queryData from the Entity.
@@ -17,12 +19,12 @@ import com.c2kernel.utils.Logger; */
public class ProxyLoader extends ClusterStorage {
- HashMap<Integer, ManageableEntity> entities = new HashMap<Integer, ManageableEntity>();
- LDAPLookup lookup;
+ HashMap<Integer, Item> entities = new HashMap<Integer, Item>();
+ Lookup lookup;
@Override
- public void open() throws ClusterStorageException {
- lookup = Gateway.getLDAPLookup();
+ public void open(Authenticator auth) throws ClusterStorageException {
+ lookup = Gateway.getLookup();
}
@Override
@@ -48,7 +50,7 @@ public class ProxyLoader extends ClusterStorage { @Override
public C2KLocalObject get(Integer sysKey, String path) throws ClusterStorageException {
try {
- ManageableEntity thisEntity = getIOR(sysKey);
+ Item thisEntity = getIOR(sysKey);
String type = getClusterType(path);
// fetch the xml from the item
@@ -61,6 +63,8 @@ public class ProxyLoader extends ClusterStorage { else
return (C2KLocalObject)Gateway.getMarshaller().unmarshall(queryData);
}
+ } catch (ObjectNotFoundException e) {
+ return null;
} catch (Exception e) {
Logger.error(e);
throw new ClusterStorageException(e.getMessage());
@@ -87,7 +91,7 @@ public class ProxyLoader extends ClusterStorage { @Override
public String[] getClusterContents(Integer sysKey, String path) throws ClusterStorageException {
try {
- ManageableEntity thisEntity = getIOR(sysKey);
+ Item thisEntity = getIOR(sysKey);
String contents = thisEntity.queryData(path+"/all");
StringTokenizer tok = new StringTokenizer(contents, ",");
String[] result = new String[tok.countTokens()];
@@ -101,7 +105,7 @@ public class ProxyLoader extends ClusterStorage { }
}
- private ManageableEntity getIOR(Integer sysKey) throws ClusterStorageException {
+ private Item getIOR(Integer sysKey) throws ClusterStorageException {
if (entities.containsKey(sysKey)) {
// check the cache
Logger.msg(7, "ProxyLoader.getIOR() - "+sysKey+" cached.");
@@ -110,22 +114,22 @@ public class ProxyLoader extends ClusterStorage { try {
Logger.msg(7, "ProxyLoader.getIOR() - Resolving "+sysKey+".");
- org.omg.CORBA.Object ior = lookup.getIOR(new EntityPath(sysKey.intValue()));
+ org.omg.CORBA.Object ior = lookup.resolve(new ItemPath(sysKey.intValue()));
- ManageableEntity thisEntity = null;
+ Item thisItem = null;
try {
- thisEntity = ItemHelper.narrow(ior);
+ thisItem = ItemHelper.narrow(ior);
} catch (org.omg.CORBA.BAD_PARAM ex) {
try {
- thisEntity = AgentHelper.narrow(ior);
+ thisItem = AgentHelper.narrow(ior);
} catch (org.omg.CORBA.BAD_PARAM ex2) {
throw new ClusterStorageException ("Could not narrow "+sysKey+" as a known Entity type");
}
}
Logger.msg(7, "ProxyLoader.getIOR() - Found "+sysKey+".");
- entities.put(sysKey, thisEntity);
- return thisEntity;
+ entities.put(sysKey, thisItem);
+ return thisItem;
} catch (Exception e) {
throw new ClusterStorageException("Error narrowing "+sysKey+": "+e.getMessage());
}
diff --git a/src/main/java/com/c2kernel/persistency/RemoteMap.java b/src/main/java/com/c2kernel/persistency/RemoteMap.java index b36648f..9f1d8a3 100644 --- a/src/main/java/com/c2kernel/persistency/RemoteMap.java +++ b/src/main/java/com/c2kernel/persistency/RemoteMap.java @@ -9,10 +9,10 @@ import java.util.TreeMap; import com.c2kernel.common.ObjectNotFoundException;
import com.c2kernel.entity.C2KLocalObject;
-import com.c2kernel.entity.proxy.EntityProxy;
-import com.c2kernel.entity.proxy.EntityProxyObserver;
+import com.c2kernel.entity.proxy.ItemProxy;
import com.c2kernel.entity.proxy.MemberSubscription;
-import com.c2kernel.lookup.EntityPath;
+import com.c2kernel.entity.proxy.ProxyObserver;
+import com.c2kernel.lookup.ItemPath;
import com.c2kernel.process.Gateway;
import com.c2kernel.utils.Logger;
@@ -34,9 +34,9 @@ public class RemoteMap<V extends C2KLocalObject> extends TreeMap<String, V> impl private String mPath = "";
Object keyLock = null;
TransactionManager storage;
- EntityProxyObserver<V> listener;
+ ProxyObserver<V> listener;
Comparator<String> comp;
- EntityProxy source;
+ ItemProxy source;
Object mLocker; // if this remote map will participate in a transaction
public RemoteMap(int sysKey, String path, Object locker) {
@@ -68,7 +68,7 @@ public class RemoteMap<V extends C2KLocalObject> extends TreeMap<String, V> impl } catch (NumberFormatException e) {}
storage = Gateway.getStorage();
- listener = new EntityProxyObserver<V>() {
+ listener = new ProxyObserver<V>() {
@Override
public void add(V obj) {
synchronized (this) {
@@ -88,7 +88,7 @@ public class RemoteMap<V extends C2KLocalObject> extends TreeMap<String, V> impl };
try {
- source = Gateway.getProxyManager().getProxy(new EntityPath(sysKey));
+ source = Gateway.getProxyManager().getProxy(new ItemPath(sysKey));
source.subscribe(new MemberSubscription<V>(listener, path, false));
} catch (Exception ex) {
Logger.error("Error subscribing to remote map. Changes will not be received");
diff --git a/src/main/java/com/c2kernel/persistency/TransactionManager.java b/src/main/java/com/c2kernel/persistency/TransactionManager.java index 86e8199..94b8123 100644 --- a/src/main/java/com/c2kernel/persistency/TransactionManager.java +++ b/src/main/java/com/c2kernel/persistency/TransactionManager.java @@ -7,6 +7,7 @@ import com.c2kernel.common.ObjectNotFoundException; import com.c2kernel.entity.C2KLocalObject;
import com.c2kernel.entity.agent.JobList;
import com.c2kernel.events.History;
+import com.c2kernel.process.auth.Authenticator;
import com.c2kernel.utils.Logger;
public class TransactionManager {
@@ -15,8 +16,8 @@ public class TransactionManager { HashMap<Object, ArrayList<TransactionEntry>> pendingTransactions;
ClusterStorageManager storage;
- public TransactionManager() throws ClusterStorageException {
- storage = new ClusterStorageManager();
+ public TransactionManager(Authenticator auth) throws ClusterStorageException {
+ storage = new ClusterStorageManager(auth);
locks = new HashMap<Integer, Object>();
pendingTransactions = new HashMap<Object, ArrayList<TransactionEntry>>();
}
@@ -63,16 +64,14 @@ public class TransactionManager { Integer sysKeyIntObj = new Integer(sysKey);
// check to see if the locker has been modifying this cluster
- synchronized(locks) {
- if (locks.containsKey(sysKeyIntObj) && locks.get(sysKeyIntObj).equals(locker)) {
- ArrayList<TransactionEntry> lockerTransaction = pendingTransactions.get(locker);
- for (TransactionEntry thisEntry : lockerTransaction) {
- if (sysKey == thisEntry.sysKey.intValue() && path.equals(thisEntry.getPath())) {
- if (thisEntry.obj == null)
- throw new ClusterStorageException("ClusterStorageManager.get() - Cluster " + path + " has been deleted in " + sysKey +
- " but not yet committed");
- return thisEntry.obj;
- }
+ if (locks.containsKey(sysKeyIntObj) && locks.get(sysKeyIntObj).equals(locker)) {
+ ArrayList<TransactionEntry> lockerTransaction = pendingTransactions.get(locker);
+ for (TransactionEntry thisEntry : lockerTransaction) {
+ if (sysKey == thisEntry.sysKey.intValue() && path.equals(thisEntry.getPath())) {
+ if (thisEntry.obj == null)
+ throw new ClusterStorageException("ClusterStorageManager.get() - Cluster " + path + " has been deleted in " + sysKey +
+ " but not yet committed");
+ return thisEntry.obj;
}
}
}
@@ -85,6 +84,7 @@ public class TransactionManager { */
public void put(int sysKey, C2KLocalObject obj, Object locker) throws ClusterStorageException {
Integer sysKeyIntObj = new Integer(sysKey);
+ Object tempLocker = null;
ArrayList<TransactionEntry> lockerTransaction;
String path = ClusterStorage.getPath(obj);
@@ -99,28 +99,35 @@ public class TransactionManager { throw new ClusterStorageException("ClusterStorageManager.get() - Access denied: Object " + sysKeyIntObj +
" has been locked for writing by " + thisLocker);
}
- else { // either we are the locker, or there is no locker
- if (locker == null) { // non-locking put/delete
- storage.put(sysKeyIntObj, obj);
- return;
+ else { // no locks for this item
+ if (locker == null) { // lock the item until the non-transactional put is complete :/
+ tempLocker = new Object();
+ locks.put(sysKeyIntObj, tempLocker);
+ lockerTransaction = null;
}
- else {// initialise the transaction
+ else { // initialise the transaction
locks.put(sysKeyIntObj, locker);
lockerTransaction = new ArrayList<TransactionEntry>();
pendingTransactions.put(locker, lockerTransaction);
}
}
-
- // create the new entry in the transaction table
- TransactionEntry newEntry = new TransactionEntry(sysKeyIntObj, path, obj);
- /* equals() in TransactionEntry only compares sysKey and path, so we can use
- * contains() in ArrayList to looks for preexisting entries for this cluster
- * and overwrite them.
- */
- if (lockerTransaction.contains(newEntry))
- lockerTransaction.remove(newEntry);
- lockerTransaction.add(newEntry);
}
+
+ if (tempLocker != null) { // non-locking put/delete
+ storage.put(sysKeyIntObj, obj);
+ locks.remove(sysKeyIntObj);
+ return;
+ }
+
+ // create the new entry in the transaction table
+ TransactionEntry newEntry = new TransactionEntry(sysKeyIntObj, path, obj);
+ /* equals() in TransactionEntry only compares sysKey and path, so we can use
+ * contains() in ArrayList to looks for preexisting entries for this cluster
+ * and overwrite them.
+ */
+ if (lockerTransaction.contains(newEntry))
+ lockerTransaction.remove(newEntry);
+ lockerTransaction.add(newEntry);
}
/** Public delete method. Uses the put method, with null as the object value.
@@ -128,6 +135,7 @@ public class TransactionManager { public void remove(int sysKey, String path, Object locker) throws ClusterStorageException {
Integer sysKeyIntObj = new Integer(sysKey);
ArrayList<TransactionEntry> lockerTransaction;
+ Object tempLocker = null;
synchronized(locks) {
// look to see if this object is already locked
if (locks.containsKey(sysKeyIntObj)) {
@@ -141,8 +149,9 @@ public class TransactionManager { }
else { // either we are the locker, or there is no locker
if (locker == null) { // non-locking put/delete
- storage.remove(sysKeyIntObj, path);
- return;
+ tempLocker = new Object();
+ locks.put(sysKeyIntObj, tempLocker);
+ lockerTransaction = null;
}
else {// initialise the transaction
locks.put(sysKeyIntObj, locker);
@@ -150,17 +159,24 @@ public class TransactionManager { pendingTransactions.put(locker, lockerTransaction);
}
}
-
- // create the new entry in the transaction table
- TransactionEntry newEntry = new TransactionEntry(sysKeyIntObj, path, null);
- /* equals() in TransactionEntry only compares sysKey and path, so we can use
- * contains() in ArrayList to looks for preexisting entries for this cluster
- * and overwrite them.
- */
- if (lockerTransaction.contains(newEntry))
- lockerTransaction.remove(newEntry);
- lockerTransaction.add(newEntry);
}
+
+ if (tempLocker != null) {
+ storage.remove(sysKeyIntObj, path);
+ locks.remove(sysKeyIntObj);
+ return;
+ }
+
+ // create the new entry in the transaction table
+ TransactionEntry newEntry = new TransactionEntry(sysKeyIntObj, path, null);
+ /* equals() in TransactionEntry only compares sysKey and path, so we can use
+ * contains() in ArrayList to looks for preexisting entries for this cluster
+ * and overwrite them.
+ */
+ if (lockerTransaction.contains(newEntry))
+ lockerTransaction.remove(newEntry);
+ lockerTransaction.add(newEntry);
+
}
/**
diff --git a/src/main/java/com/c2kernel/persistency/XMLClusterStorage.java b/src/main/java/com/c2kernel/persistency/XMLClusterStorage.java index f63dac6..e6c6e9f 100644 --- a/src/main/java/com/c2kernel/persistency/XMLClusterStorage.java +++ b/src/main/java/com/c2kernel/persistency/XMLClusterStorage.java @@ -3,10 +3,11 @@ import java.io.File; import java.util.ArrayList;
import com.c2kernel.entity.C2KLocalObject;
-import com.c2kernel.lookup.EntityPath;
-import com.c2kernel.lookup.InvalidEntityPathException;
+import com.c2kernel.lookup.InvalidItemPathException;
+import com.c2kernel.lookup.ItemPath;
import com.c2kernel.persistency.outcome.Outcome;
import com.c2kernel.process.Gateway;
+import com.c2kernel.process.auth.Authenticator;
import com.c2kernel.utils.FileStringUtility;
import com.c2kernel.utils.Logger;
@@ -17,7 +18,7 @@ public class XMLClusterStorage extends ClusterStorage { }
@Override
- public void open() throws ClusterStorageException {
+ public void open(Authenticator auth) throws ClusterStorageException {
String rootProp = Gateway.getProperties().getProperty("XMLStorage.root");
if (rootProp == null)
throw new ClusterStorageException("XMLClusterStorage.open() - Root path not given in config file.");
@@ -145,8 +146,8 @@ public class XMLClusterStorage extends ClusterStorage { }
}
- protected String getFilePath(Integer sysKey, String path) throws InvalidEntityPathException {
- EntityPath thisEntity = new EntityPath(sysKey.intValue());
+ protected String getFilePath(Integer sysKey, String path) throws InvalidItemPathException {
+ ItemPath thisEntity = new ItemPath(sysKey.intValue());
if (path.length() == 0 || path.charAt(0) != '/') path = "/"+path;
String filePath = rootDir+thisEntity.toString()+path;
Logger.msg(8, "XMLClusterStorage.getFilePath() - "+filePath);
diff --git a/src/main/java/com/c2kernel/persistency/outcome/Outcome.java b/src/main/java/com/c2kernel/persistency/outcome/Outcome.java index b2f706b..70a2a24 100644 --- a/src/main/java/com/c2kernel/persistency/outcome/Outcome.java +++ b/src/main/java/com/c2kernel/persistency/outcome/Outcome.java @@ -5,8 +5,14 @@ import java.util.StringTokenizer; import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
import org.w3c.dom.Document;
+import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.w3c.dom.Text;
import org.w3c.dom.bootstrap.DOMImplementationRegistry;
@@ -14,6 +20,7 @@ import org.w3c.dom.ls.DOMImplementationLS; import org.w3c.dom.ls.LSSerializer;
import org.xml.sax.InputSource;
+import com.c2kernel.common.InvalidDataException;
import com.c2kernel.common.ObjectNotFoundException;
import com.c2kernel.common.PersistencyException;
import com.c2kernel.entity.C2KLocalObject;
@@ -29,6 +36,7 @@ public class Outcome implements C2KLocalObject { Document dom;
static DocumentBuilder parser;
static DOMImplementationLS impl;
+ static XPath xpath;
static {
// Set up parser
@@ -50,7 +58,9 @@ public class Outcome implements C2KLocalObject { Logger.error(e);
Logger.die("Cannot function without XML serialiser");
}
-
+
+ XPathFactory xPathFactory = XPathFactory.newInstance();
+ xpath = xPathFactory.newXPath();
}
//id is the eventID
@@ -64,7 +74,7 @@ public class Outcome implements C2KLocalObject { public Outcome(String path, String data) throws PersistencyException {
// derive all the meta data from the path
StringTokenizer tok = new StringTokenizer(path,"/");
- if (tok.countTokens() != 3 && !(tok.nextToken().equals("Outcome")))
+ if (tok.countTokens() != 3 && !(tok.nextToken().equals(ClusterStorage.OUTCOME)))
throw new PersistencyException("Outcome() - Outcome path must have three components: "+path, null);
mSchemaType = tok.nextToken();
String verstring = tok.nextToken();
@@ -110,11 +120,74 @@ public class Outcome implements C2KLocalObject { }
public void setData(Document data) {
- mData = serialize(data, false);
dom = data;
+ mData = null;
+ }
+
+ public String getFieldByXPath(String xpath) throws XPathExpressionException, InvalidDataException {
+ Node field = getNodeByXPath(xpath);
+ if (field == null)
+ throw new InvalidDataException(xpath, "");
+
+ else if (field.getNodeType()==Node.TEXT_NODE || field.getNodeType()==Node.CDATA_SECTION_NODE)
+ return field.getNodeValue();
+
+ else if (field.getNodeType()==Node.ELEMENT_NODE) {
+ NodeList fieldChildren = field.getChildNodes();
+ if (fieldChildren.getLength() == 0)
+ throw new InvalidDataException("No child node for element", "");
+
+ else if (fieldChildren.getLength() == 1) {
+ Node child = fieldChildren.item(0);
+ if (child.getNodeType()==Node.TEXT_NODE || child.getNodeType()==Node.CDATA_SECTION_NODE)
+ return child.getNodeValue();
+ else
+ throw new InvalidDataException("Can't get data from child node of type "+child.getNodeName(), "");
+ }
+ else
+ throw new InvalidDataException("Element "+xpath+" has too many children", "");
+ }
+ else if (field.getNodeType()==Node.ATTRIBUTE_NODE)
+ return field.getNodeValue();
+ else
+ throw new InvalidDataException("Don't know what to do with node "+field.getNodeName(), "");
+ }
+
+ public void setFieldByXPath(String xpath, String data) throws XPathExpressionException, InvalidDataException {
+ Node field = getNodeByXPath(xpath);
+ if (field == null)
+ throw new InvalidDataException(xpath, "");
+
+ else if (field.getNodeType()==Node.ELEMENT_NODE) {
+ NodeList fieldChildren = field.getChildNodes();
+ if (fieldChildren.getLength() == 0) {
+ field.appendChild(dom.createTextNode(data));
+ }
+ else if (fieldChildren.getLength() == 1) {
+ Node child = fieldChildren.item(0);
+ switch (child.getNodeType()) {
+ case Node.TEXT_NODE:
+ case Node.CDATA_SECTION_NODE:
+ child.setNodeValue(data);
+ break;
+ default:
+ throw new InvalidDataException("Can't set child node of type "+child.getNodeName(), "");
+ }
+ }
+ else
+ throw new InvalidDataException("Element "+xpath+" has too many children", "");
+ }
+ else if (field.getNodeType()==Node.ATTRIBUTE_NODE)
+ field.setNodeValue(data);
+ else
+ throw new InvalidDataException("Don't know what to do with node "+field.getNodeName(), "");
}
+
public String getData() {
+ if (mData == null && dom != null) {
+ mData = serialize(dom, false);
+ }
return mData;
}
@@ -153,7 +226,10 @@ public class Outcome implements C2KLocalObject { if (dom == null)
try {
synchronized (parser) {
- dom = parser.parse(new InputSource(new StringReader(mData)));
+ if (mData!=null)
+ dom = parser.parse(new InputSource(new StringReader(mData)));
+ else
+ dom = parser.newDocument();
}
} catch (Exception e) {
Logger.error(e);
@@ -169,6 +245,20 @@ public class Outcome implements C2KLocalObject { else
return null;
}
+
+ public NodeList getNodesByXPath(String xpathExpr) throws XPathExpressionException {
+
+ XPathExpression expr = xpath.compile(xpathExpr);
+ return (NodeList)expr.evaluate(getDOM(), XPathConstants.NODESET);
+
+ }
+
+ public Node getNodeByXPath(String xpathExpr) throws XPathExpressionException {
+
+ XPathExpression expr = xpath.compile(xpathExpr);
+ return (Node)expr.evaluate(getDOM(), XPathConstants.NODE);
+
+ }
static public String serialize(Document doc, boolean prettyPrint)
{
diff --git a/src/main/java/com/c2kernel/persistency/outcome/OutcomeInitiator.java b/src/main/java/com/c2kernel/persistency/outcome/OutcomeInitiator.java new file mode 100644 index 0000000..82068bb --- /dev/null +++ b/src/main/java/com/c2kernel/persistency/outcome/OutcomeInitiator.java @@ -0,0 +1,10 @@ +package com.c2kernel.persistency.outcome;
+
+import com.c2kernel.common.InvalidDataException;
+import com.c2kernel.entity.agent.Job;
+
+public interface OutcomeInitiator {
+
+ public String initOutcome(Job job) throws InvalidDataException;
+
+}
|
