diff options
Diffstat (limited to 'source/com/c2kernel/entity/proxy')
| -rw-r--r--[-rwxr-xr-x] | source/com/c2kernel/entity/proxy/AgentProxy.java | 91 | ||||
| -rw-r--r--[-rwxr-xr-x] | source/com/c2kernel/entity/proxy/DomainPathSubscriber.java | 2 | ||||
| -rw-r--r-- | source/com/c2kernel/entity/proxy/EntityProxy.java | 76 | ||||
| -rw-r--r-- | source/com/c2kernel/entity/proxy/EntityProxyManager.java | 106 | ||||
| -rw-r--r--[-rwxr-xr-x] | source/com/c2kernel/entity/proxy/EntityProxyObserver.java | 6 | ||||
| -rw-r--r-- | source/com/c2kernel/entity/proxy/ItemProxy.java | 53 | ||||
| -rwxr-xr-x | source/com/c2kernel/entity/proxy/MemberControl.java | 43 | ||||
| -rw-r--r-- | source/com/c2kernel/entity/proxy/MemberSubscription.java | 72 | ||||
| -rw-r--r-- | source/com/c2kernel/entity/proxy/ProxyClientConnection.java | 74 | ||||
| -rw-r--r--[-rwxr-xr-x] | source/com/c2kernel/entity/proxy/ProxyMessage.java | 39 | ||||
| -rw-r--r--[-rwxr-xr-x] | source/com/c2kernel/entity/proxy/ProxyServerConnection.java | 27 | ||||
| -rw-r--r--[-rwxr-xr-x] | source/com/c2kernel/entity/proxy/ProxySubscriber.java | 4 |
12 files changed, 284 insertions, 309 deletions
diff --git a/source/com/c2kernel/entity/proxy/AgentProxy.java b/source/com/c2kernel/entity/proxy/AgentProxy.java index af77031..f2e8283 100755..100644 --- a/source/com/c2kernel/entity/proxy/AgentProxy.java +++ b/source/com/c2kernel/entity/proxy/AgentProxy.java @@ -66,8 +66,9 @@ public class AgentProxy extends EntityProxy throw new ObjectNotFoundException();
}
}
-
- public ManageableEntity narrow() throws ObjectNotFoundException
+
+ @Override
+ public ManageableEntity narrow() throws ObjectNotFoundException
{
try {
return AgentHelper.narrow(mIOR);
@@ -85,48 +86,48 @@ public class AgentProxy extends EntityProxy ObjectNotFoundException
{
Logger.msg(7, "AgentProxy::initialise - started");
-
+
((Agent)getEntity()).initialise( agentProps );
}
-
+
public AgentPath getPath() {
return path;
}
-
+
/**
- * Executes a job on the given item using this agent.
- *
+ * Executes a job on the given item using this agent.
+ *
* @param item - item holding this job
* @param job - the job to execute
*/
- public void execute(ItemProxy item, Job job)
- throws AccessRightsException,
- InvalidTransitionException,
- ObjectNotFoundException,
- InvalidDataException,
+ public void execute(ItemProxy item, Job job)
+ throws AccessRightsException,
+ InvalidTransitionException,
+ ObjectNotFoundException,
+ InvalidDataException,
PersistencyException,
ObjectAlreadyExistsException
- {
+ {
OutcomeValidator validator = null;
String scriptName = job.getActPropString("ScriptName");
Date startTime = new Date();
Logger.msg(3, "AgentProxy - executing "+job.getStepPath()+" for "+path.getAgentName());
// get the outcome validator if present
- if (job.isOutcomeUsed())
+ if (job.isOutcomeUsed())
{
-
+
// get schema info from act props
String schemaName = job.getActPropString("SchemaType");
int schemaVersion;
try {
schemaVersion = Integer.parseInt(job.getActPropString("SchemaVersion"));
- } catch (Exception e) {
+ } catch (Exception e) {
throw new InvalidDataException(e.getClass().getName()+" extracing schema version", "");
}
Logger.msg(5, "AgentProxy - fetching schema "+schemaName+"_"+schemaVersion+" for validation");
// retrieve schema
Schema schema = LocalObjectLoader.getSchema(schemaName, schemaVersion);
-
+
if (schema == null)
throw new InvalidDataException("Job references outcome type "+schemaName+" version "+schemaVersion+" that does not exist in this centre.", "");
@@ -136,12 +137,12 @@ public class AgentProxy extends EntityProxy throw new InvalidDataException("Could not create validator: "+e.getMessage(), "");
}
}
-
- if(scriptName != null && scriptName.length() > 0 &&
+
+ if(scriptName != null && scriptName.length() > 0 &&
(job.getPossibleTransition() == Transitions.DONE || job.getPossibleTransition() == Transitions.COMPLETE)) {
Logger.msg(3, "AgentProxy - executing script "+scriptName);
try {
-
+
// pre-validate outcome from script if there is one
if (job.getOutcomeString()!= null && validator != null) {
Logger.msg(5, "AgentProxy - validating outcome before script execution");
@@ -151,10 +152,10 @@ public class AgentProxy extends EntityProxy throw new InvalidDataException(error, "");
}
}
-
+
// load script
ErrorInfo scriptErrors = (ErrorInfo)callScript(item, job);
-
+
if (scriptErrors.getFatal()) {
Logger.msg(3, "AgentProxy - fatal script error");
Logger.error(scriptErrors.getErrors());
@@ -174,7 +175,7 @@ public class AgentProxy extends EntityProxy if (error.length() > 0)
throw new InvalidDataException(error, "");
}
-
+
job.setAgentId(getSystemKey());
Logger.msg(3, "AgentProxy - submitting job to item proxy");
item.requestAction(job);
@@ -184,16 +185,16 @@ public class AgentProxy extends EntityProxy Logger.msg(3, "Execution took "+secsNow+" seconds");
}
}
-
+
public Object callScript(ItemProxy item, Job job) throws ScriptingEngineException {
Script script = new Script(item, this, job);
return script.execute();
}
-
+
/**
- * Standard execution of jobs. Note that this method should always be the one used from clients - all execution
- * parameters are taken from the job where they're probably going to be correct.
- *
+ * Standard execution of jobs. Note that this method should always be the one used from clients - all execution
+ * parameters are taken from the job where they're probably going to be correct.
+ *
* @param job
* @throws AccessRightsException
* @throws InvalidDataException
@@ -202,7 +203,7 @@ public class AgentProxy extends EntityProxy * @throws PersistencyException
* @throws ObjectAlreadyExistsException
*/
- public void execute(Job job)
+ public void execute(Job job)
throws AccessRightsException,
InvalidDataException,
InvalidTransitionException,
@@ -217,8 +218,8 @@ public class AgentProxy extends EntityProxy throw new ObjectNotFoundException("Job contained invalid item sysKey: "+job.getItemSysKey(), "");
}
}
-
- public void execute(ItemProxy item, String predefStep, C2KLocalObject obj)
+
+ public void execute(ItemProxy item, String predefStep, C2KLocalObject obj)
throws AccessRightsException,
InvalidDataException,
InvalidTransitionException,
@@ -235,8 +236,8 @@ public class AgentProxy extends EntityProxy }
execute(item, predefStep, param);
}
-
- public void execute(ItemProxy item, String predefStep, String param)
+
+ public void execute(ItemProxy item, String predefStep, String param)
throws AccessRightsException,
InvalidDataException,
InvalidTransitionException,
@@ -248,8 +249,8 @@ public class AgentProxy extends EntityProxy params[0] = param;
execute(item, predefStep, params);
}
-
- public void execute(ItemProxy item, String predefStep, String[] params)
+
+ public void execute(ItemProxy item, String predefStep, String[] params)
throws AccessRightsException,
InvalidDataException,
InvalidTransitionException,
@@ -259,42 +260,42 @@ public class AgentProxy extends EntityProxy {
item.requestAction(getSystemKey(), "workflow/predefined/"+predefStep, Transitions.DONE, PredefinedStep.bundleData(params));
}
-
+
/** Wrappers for scripts */
public String marshall(Object obj) throws Exception {
return CastorXMLUtility.marshall(obj);
}
-
+
public Object unmarshall(String obj) throws Exception {
return CastorXMLUtility.unmarshall(obj);
}
-
+
/** Let scripts resolve items */
public ItemProxy searchItem(String name) throws ObjectNotFoundException {
- Enumeration results = Gateway.getLDAPLookup().search(new DomainPath(""),name);
-
+ Enumeration<?> results = Gateway.getLDAPLookup().search(new DomainPath(""),name);
+
Path returnPath = null;
if (!results.hasMoreElements())
throw new ObjectNotFoundException(name, "");
-
+
while(results.hasMoreElements()) {
Path nextMatch = (Path)results.nextElement();
if (returnPath != null && nextMatch.getSysKey() != -1 && returnPath.getSysKey() != nextMatch.getSysKey())
throw new ObjectNotFoundException("Too many items with that name");
returnPath = nextMatch;
}
-
+
return (ItemProxy)Gateway.getProxyManager().getProxy(returnPath);
}
-
+
public ItemProxy getItem(String path) throws ObjectNotFoundException {
return (getItem(new DomainPath(path)));
}
-
+
public ItemProxy getItem(DomainPath path) throws ObjectNotFoundException {
return (ItemProxy)Gateway.getProxyManager().getProxy(path);
}
-
+
public ItemProxy getItemBySysKey(int sysKey) throws ObjectNotFoundException, InvalidEntityPathException {
return (ItemProxy)Gateway.getProxyManager().getProxy(new EntityPath(sysKey));
}
diff --git a/source/com/c2kernel/entity/proxy/DomainPathSubscriber.java b/source/com/c2kernel/entity/proxy/DomainPathSubscriber.java index e09178d..4089325 100755..100644 --- a/source/com/c2kernel/entity/proxy/DomainPathSubscriber.java +++ b/source/com/c2kernel/entity/proxy/DomainPathSubscriber.java @@ -12,7 +12,7 @@ import com.c2kernel.lookup.DomainPath; **************************************************************************/
public interface DomainPathSubscriber {
-
+
public void pathAdded(DomainPath path);
public void pathRemoved(DomainPath path);
}
diff --git a/source/com/c2kernel/entity/proxy/EntityProxy.java b/source/com/c2kernel/entity/proxy/EntityProxy.java index b34653f..a9f6066 100644 --- a/source/com/c2kernel/entity/proxy/EntityProxy.java +++ b/source/com/c2kernel/entity/proxy/EntityProxy.java @@ -10,14 +10,17 @@ package com.c2kernel.entity.proxy;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Iterator;
import com.c2kernel.common.ObjectNotFoundException;
-import com.c2kernel.entity.*;
-import com.c2kernel.persistency.*;
+import com.c2kernel.entity.C2KLocalObject;
+import com.c2kernel.entity.ManageableEntity;
+import com.c2kernel.persistency.ClusterStorageException;
import com.c2kernel.process.Gateway;
import com.c2kernel.property.Property;
-import com.c2kernel.utils.*;
+import com.c2kernel.utils.CastorXMLUtility;
+import com.c2kernel.utils.Logger;
/******************************************************************************
@@ -35,7 +38,7 @@ abstract public class EntityProxy implements ManageableEntity protected ManageableEntity mEntity = null;
protected org.omg.CORBA.Object mIOR;
protected int mSystemKey;
- private HashMap<MemberSubscription, EntityProxyObserver> mSubscriptions;
+ private HashMap<MemberSubscription<?>, EntityProxyObserver<?>> mSubscriptions;
/**************************************************************************
*
@@ -45,7 +48,7 @@ abstract public class EntityProxy implements ManageableEntity throws ObjectNotFoundException
{
Logger.msg(8,"EntityProxy::EntityProxy() - Initialising '" +systemKey+ "' entity");
-
+
initialise( ior, systemKey);
}
@@ -60,7 +63,7 @@ abstract public class EntityProxy implements ManageableEntity mIOR = ior;
mSystemKey = systemKey;
- mSubscriptions = new HashMap<MemberSubscription, EntityProxyObserver>();
+ mSubscriptions = new HashMap<MemberSubscription<?>, EntityProxyObserver<?>>();
}
@@ -74,14 +77,15 @@ abstract public class EntityProxy implements ManageableEntity }
return mEntity;
}
-
+
abstract public ManageableEntity narrow() throws ObjectNotFoundException;
-
+
/**************************************************************************
*
**************************************************************************/
//check who is using.. and if toString() is sufficient
- public int getSystemKey()
+ @Override
+ public int getSystemKey()
{
return mSystemKey;
}
@@ -90,7 +94,8 @@ abstract public class EntityProxy implements ManageableEntity /**************************************************************************
*
**************************************************************************/
- public String queryData( String path )
+ @Override
+ public String queryData( String path )
throws ObjectNotFoundException
{
@@ -114,9 +119,9 @@ abstract public class EntityProxy implements ManageableEntity } catch (Exception e) {
Logger.error(e);
return "<ERROR>"+e.getMessage()+"</ERROR>";
- }
+ }
}
-
+
public String[] getContents( String path ) throws ObjectNotFoundException {
try {
return Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length()));
@@ -160,39 +165,36 @@ abstract public class EntityProxy implements ManageableEntity throw new ObjectNotFoundException();
}
}
-
+
public String getName()
{
try {
- return getProperty("Name");
+ return getProperty("Name");
} catch (ObjectNotFoundException ex) {
return null;
}
}
-
+
/**************************************************************************
* Subscription methods
**************************************************************************/
- public void subscribe (EntityProxyObserver observer,
- String interest,
- boolean preload)
- {
- MemberSubscription newSub = new MemberSubscription(this, interest, observer, preload);
- synchronized (this){
- mSubscriptions.put( newSub, observer );
+ public void subscribe (MemberSubscription<?> newSub) {
+
+ newSub.setSubject(this);
+ synchronized (this){
+ mSubscriptions.put( newSub, newSub.getObserver() );
}
new Thread(newSub).start();
- Logger.msg(7, "Subscribed "+observer.getClass().getName()+" for "+interest);
+ Logger.msg(7, "Subscribed "+newSub.getObserver().getClass().getName()+" for "+newSub.interest);
}
-
- public void unsubscribe(EntityProxyObserver observer)
+ public void unsubscribe(EntityProxyObserver<?> observer)
{
synchronized (this){
- for (Iterator e = mSubscriptions.keySet().iterator(); e.hasNext();) {
- MemberSubscription thisSub = (MemberSubscription)e.next();
+ for (Iterator<MemberSubscription<?>> e = mSubscriptions.keySet().iterator(); e.hasNext();) {
+ MemberSubscription<?> thisSub = e.next();
if (mSubscriptions.get( thisSub ) == observer) {
e.remove();
Logger.msg(7, "Unsubscribed "+observer.getClass().getName());
@@ -200,14 +202,13 @@ abstract public class EntityProxy implements ManageableEntity }
}
}
-
+
public void dumpSubscriptions(int logLevel) {
if (mSubscriptions.size() == 0) return;
Logger.msg(logLevel, "Subscriptions to proxy "+mSystemKey+":");
synchronized(this) {
- for (Iterator iter = mSubscriptions.keySet().iterator(); iter.hasNext();) {
- MemberSubscription element = (MemberSubscription)iter.next();
- EntityProxyObserver obs = element.getObserver();
+ for (MemberSubscription<?> element : mSubscriptions.keySet()) {
+ EntityProxyObserver<?> obs = element.getObserver();
if (obs != null)
Logger.msg(logLevel, " "+element.getObserver().getClass().getName()+" subscribed to "+element.interest);
else
@@ -215,14 +216,14 @@ abstract public class EntityProxy implements ManageableEntity }
}
}
-
+
public void notify(ProxyMessage message) {
Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mSystemKey);
synchronized (this){
if (!message.getServer().equals(EntityProxyManager.serverName))
Gateway.getStorage().clearCache(mSystemKey, message.getPath());
- for (Iterator e = mSubscriptions.keySet().iterator(); e.hasNext();) {
- MemberSubscription newSub = (MemberSubscription)e.next();
+ for (Iterator<MemberSubscription<?>> e = mSubscriptions.keySet().iterator(); e.hasNext();) {
+ MemberSubscription<?> newSub = e.next();
if (newSub.getObserver() == null) { // phantom
Logger.msg(4, "Removing phantom subscription to "+newSub.interest);
e.remove();
@@ -232,11 +233,12 @@ abstract public class EntityProxy implements ManageableEntity }
}
}
-
+
/**
* If this is reaped, clear out the cache for it too.
*/
- protected void finalize() throws Throwable {
+ @Override
+ protected void finalize() throws Throwable {
Logger.msg(7, "Proxy "+mSystemKey+" reaped");
Gateway.getStorage().clearCache(mSystemKey, null);
Gateway.getProxyManager().removeProxy(mSystemKey);
diff --git a/source/com/c2kernel/entity/proxy/EntityProxyManager.java b/source/com/c2kernel/entity/proxy/EntityProxyManager.java index 3224da7..8ad4576 100644 --- a/source/com/c2kernel/entity/proxy/EntityProxyManager.java +++ b/source/com/c2kernel/entity/proxy/EntityProxyManager.java @@ -10,7 +10,11 @@ package com.c2kernel.entity.proxy;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
import com.c2kernel.common.InvalidDataException;
import com.c2kernel.common.ObjectNotFoundException;
@@ -25,12 +29,12 @@ import com.c2kernel.utils.SoftCache; import com.c2kernel.utils.server.SimpleTCPIPServer;
-public class EntityProxyManager
+public class EntityProxyManager
{
SoftCache<Integer, EntityProxy> proxyPool = new SoftCache<Integer, EntityProxy>(50);
HashMap<DomainPathSubscriber, DomainPath> treeSubscribers = new HashMap<DomainPathSubscriber, DomainPath>();
HashMap<String, ProxyServerConnection> connections = new HashMap<String, ProxyServerConnection>();
-
+
// server objects
static ArrayList<ProxyClientConnection> proxyClients = new ArrayList<ProxyClientConnection>();
static SimpleTCPIPServer proxyServer = null;
@@ -42,8 +46,8 @@ public class EntityProxyManager public EntityProxyManager()
{
Logger.msg(5, "EntityProxyManager - Starting.....");
-
- Enumeration servers = Gateway.getLDAPLookup().searchEntities(new DomainPath("/servers"));
+
+ Enumeration<?> servers = Gateway.getLDAPLookup().searchEntities(new DomainPath("/servers"));
while(servers.hasMoreElements()) {
Path thisServerPath = (Path)servers.nextElement();
try {
@@ -52,65 +56,62 @@ public class EntityProxyManager String portStr = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/ProxyPort", null)).getValue();
int remotePort = Integer.parseInt(portStr);
connectToProxyServer(remoteServer, remotePort);
-
+
} catch (Exception ex) {
Logger.error("Exception retrieving proxy server connection data for "+thisServerPath);
Logger.error(ex);
}
}
}
-
+
public void connectToProxyServer(String name, int port) {
- ProxyServerConnection oldConn = (ProxyServerConnection)connections.get(name);
+ ProxyServerConnection oldConn = connections.get(name);
if (oldConn != null)
oldConn.shutdown();
connections.put(name, new ProxyServerConnection(name, port, this));
}
-
-
+
+
protected void resubscribe(ProxyServerConnection conn) {
synchronized (proxyPool) {
- for (Iterator iter = proxyPool.keySet().iterator(); iter.hasNext();) {
- Integer key = (Integer)iter.next();
+ for (Integer key : proxyPool.keySet()) {
ProxyMessage sub = new ProxyMessage(key.intValue(), ProxyMessage.ADDPATH, false);
Logger.msg(5, "Subscribing to entity "+key);
conn.sendMessage(sub);
}
}
}
-
+
/**
* @param sub
*/
private void sendMessage(ProxyMessage sub) {
- for (Iterator iter = connections.values().iterator(); iter.hasNext();) {
- ProxyServerConnection element = (ProxyServerConnection) iter.next();
+ for (ProxyServerConnection element : connections.values()) {
element.sendMessage(sub);
}
-
+
}
public void shutdown() {
Logger.msg("EntityProxyManager.shutdown() - flagging shutdown of server connections");
- for (Iterator iter = connections.values().iterator(); iter.hasNext();) {
- ProxyServerConnection element = (ProxyServerConnection) iter.next();
+ for (ProxyServerConnection element : connections.values()) {
element.shutdown();
}
}
-
+
protected void processMessage(ProxyMessage thisMessage) throws InvalidDataException {
if (Logger.doLog(9)) Logger.msg(9, thisMessage.toString());
-
+
if (thisMessage.getPath().equals(ProxyMessage.PINGPATH)) // ping response
return;
-
+
if (thisMessage.getSysKey() == ProxyMessage.NA) // must be domain path info
informTreeSubscribers(thisMessage.getState(), thisMessage.getPath());
else {
// proper proxy message
Logger.msg(5, "Received proxy message: "+thisMessage.toString());
Integer key = new Integer(thisMessage.getSysKey());
- EntityProxy relevant = (EntityProxy)proxyPool.get(key);
+ EntityProxy relevant = proxyPool.get(key);
if (relevant == null)
Logger.warning("Received proxy message for sysKey "+thisMessage.getSysKey()+" which we don't have a proxy for.");
else
@@ -120,18 +121,17 @@ public class EntityProxyManager Logger.error("Error caught notifying proxy listener "+relevant.toString()+" of "+thisMessage.toString());
Logger.error(ex);
}
- }
+ }
}
-
+
private void informTreeSubscribers(boolean state, String path) {
DomainPath last = new DomainPath(path);
DomainPath parent; boolean first = true;
synchronized(treeSubscribers) {
while((parent = last.getParent()) != null) {
-
- for (Iterator iter = treeSubscribers.keySet().iterator(); iter.hasNext();) {
- DomainPathSubscriber sub = (DomainPathSubscriber)iter.next();
- DomainPath interest = (DomainPath)treeSubscribers.get(sub);
+
+ for (DomainPathSubscriber sub : treeSubscribers.keySet()) {
+ DomainPath interest = treeSubscribers.get(sub);
if (interest.equals(parent)) {
if (state == ProxyMessage.ADDED)
sub.pathAdded(last);
@@ -142,26 +142,26 @@ public class EntityProxyManager last = parent;
first = false;
}
- }
+ }
}
-
+
public void subscribeTree(DomainPathSubscriber sub, DomainPath interest) {
synchronized(treeSubscribers) {
treeSubscribers.put(sub, interest);
}
}
-
+
public void unsubscribeTree(DomainPathSubscriber sub) {
synchronized(treeSubscribers) {
treeSubscribers.remove(sub);
- }
+ }
}
/**************************************************************************
*
**************************************************************************/
- private EntityProxy createProxy( org.omg.CORBA.Object ior,
- int systemKey,
+ private EntityProxy createProxy( org.omg.CORBA.Object ior,
+ int systemKey,
boolean isItem )
throws ObjectNotFoundException
{
@@ -185,7 +185,7 @@ public class EntityProxyManager reportCurrentProxies(9);
return ( newProxy );
}
-
+
protected void removeProxy( int systemKey )
{
ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.DELPATH, true);
@@ -210,7 +210,7 @@ public class EntityProxyManager synchronized(proxyPool) {
EntityProxy newProxy;
// return it if it exists
- newProxy = (EntityProxy)proxyPool.get(key);
+ newProxy = proxyPool.get(key);
if (newProxy == null) {
// create a new one
newProxy = createProxy(ior, systemKey, isItem );
@@ -250,14 +250,14 @@ public class EntityProxyManager Logger.msg(logLevel, "Current proxies: ");
try {
synchronized(proxyPool) {
- Iterator i = proxyPool.keySet().iterator();
-
+ Iterator<Integer> i = proxyPool.keySet().iterator();
+
for( int count=0; i.hasNext(); count++ )
{
- Integer nextProxy = (Integer)i.next();
- EntityProxy thisProxy = (EntityProxy)proxyPool.get(nextProxy);
+ Integer nextProxy = i.next();
+ EntityProxy thisProxy = proxyPool.get(nextProxy);
if (thisProxy != null)
- Logger.msg(logLevel,
+ Logger.msg(logLevel,
"" + count + ": "
+ proxyPool.get(nextProxy).getClass().getName()
+ ": " + nextProxy);
@@ -267,12 +267,12 @@ public class EntityProxyManager Logger.msg(logLevel, "Proxy cache modified. Aborting.");
}
}
-
+
/**************************************************************************
* Static Proxy Server methods
- **************************************************************************/
-
+ **************************************************************************/
+
/**
* Initialises the Proxy event UDP server listening on 'Host.Proxy.port' from c2kprops
* @param c2kProps
@@ -286,7 +286,7 @@ public class EntityProxyManager Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of entity changes.");
return;
}
-
+
// set up the proxy server
try {
int portNo = Integer.parseInt(port);
@@ -298,40 +298,38 @@ public class EntityProxyManager Logger.error(ex);
}
}
-
+
public static void sendProxyEvent(ProxyMessage message) {
if (proxyServer != null && message.getPath() != null)
synchronized(proxyClients) {
- for (Iterator iter = proxyClients.iterator(); iter.hasNext();) {
- ProxyClientConnection client = (ProxyClientConnection)iter.next();
+ for (ProxyClientConnection client : proxyClients) {
client.sendMessage(message);
}
}
}
-
+
public static void reportConnections(int logLevel) {
synchronized(proxyClients) {
Logger.msg(logLevel, "Currently connected proxy clients:");
- for (Iterator iter = proxyClients.iterator(); iter.hasNext();) {
- ProxyClientConnection client = (ProxyClientConnection)iter.next();
+ for (ProxyClientConnection client : proxyClients) {
Logger.msg(logLevel, " "+client);
}
}
}
-
+
public static void shutdownServer() {
if (proxyServer != null) {
Logger.msg(1, "EntityProxyManager: Closing Server.");
proxyServer.stopListening();
}
}
-
+
public static void registerProxyClient(ProxyClientConnection client) {
synchronized(proxyClients) {
proxyClients.add(client);
}
}
-
+
public static void unRegisterProxyClient(ProxyClientConnection client) {
synchronized(proxyClients) {
proxyClients.remove(client);
diff --git a/source/com/c2kernel/entity/proxy/EntityProxyObserver.java b/source/com/c2kernel/entity/proxy/EntityProxyObserver.java index 985143d..3ddb99c 100755..100644 --- a/source/com/c2kernel/entity/proxy/EntityProxyObserver.java +++ b/source/com/c2kernel/entity/proxy/EntityProxyObserver.java @@ -4,14 +4,14 @@ import com.c2kernel.entity.C2KLocalObject; -public interface EntityProxyObserver
+public interface EntityProxyObserver<V extends C2KLocalObject>
{
/**************************************************************************
* Subscribed items are broken apart and fed one by one to these methods.
* Replacement after an event is done by feeding the new memberbase with the same id.
* ID could be an XPath?
**************************************************************************/
- public void add(C2KLocalObject contents);
+ public void add(V contents);
/**************************************************************************
* the 'type' parameter should be an indication of the type of object
@@ -22,4 +22,6 @@ public interface EntityProxyObserver * the subscribe method of ItemProxy.
**************************************************************************/
public void remove(String id);
+
+ public void control(String control, String msg);
}
diff --git a/source/com/c2kernel/entity/proxy/ItemProxy.java b/source/com/c2kernel/entity/proxy/ItemProxy.java index 702dd26..658e0c8 100644 --- a/source/com/c2kernel/entity/proxy/ItemProxy.java +++ b/source/com/c2kernel/entity/proxy/ItemProxy.java @@ -18,7 +18,9 @@ import com.c2kernel.common.InvalidTransitionException; import com.c2kernel.common.ObjectAlreadyExistsException;
import com.c2kernel.common.ObjectNotFoundException;
import com.c2kernel.common.PersistencyException;
-import com.c2kernel.entity.*;
+import com.c2kernel.entity.Item;
+import com.c2kernel.entity.ItemHelper;
+import com.c2kernel.entity.ManageableEntity;
import com.c2kernel.entity.agent.Job;
import com.c2kernel.entity.agent.JobArrayList;
import com.c2kernel.lifecycle.instance.stateMachine.Transitions;
@@ -43,10 +45,11 @@ public class ItemProxy extends EntityProxy throws ObjectNotFoundException
{
super(ior, systemKey);
-
+
}
- public ManageableEntity narrow() throws ObjectNotFoundException
+ @Override
+ public ManageableEntity narrow() throws ObjectNotFoundException
{
try {
return ItemHelper.narrow(mIOR);
@@ -86,29 +89,29 @@ public class ItemProxy extends EntityProxy } catch (Exception e) {
Logger.error(e);
throw new PersistencyException("Could not store property");
- }
+ }
}
/**************************************************************************
*
**************************************************************************/
- protected void requestAction( Job thisJob )
- throws AccessRightsException,
- InvalidTransitionException,
- ObjectNotFoundException,
- InvalidDataException,
+ protected void requestAction( Job thisJob )
+ throws AccessRightsException,
+ InvalidTransitionException,
+ ObjectNotFoundException,
+ InvalidDataException,
PersistencyException,
ObjectAlreadyExistsException
{
- String outcome = thisJob.getOutcomeString();
+ String outcome = thisJob.getOutcomeString();
// check fields that should have been filled in
- if (outcome==null)
- if (thisJob.isOutcomeUsed())
+ if (outcome==null)
+ if (thisJob.isOutcomeUsed())
throw new InvalidDataException("Outcome is required.", "");
else
- outcome="";
-
- if (thisJob.getAgentId() == -1)
- throw new InvalidDataException("No Agent specified.", "");
+ outcome="";
+
+ if (thisJob.getAgentId() == -1)
+ throw new InvalidDataException("No Agent specified.", "");
Logger.msg(7, "ItemProxy - executing "+thisJob.getStepPath()+" for "+thisJob.getAgentName());
requestAction (thisJob.getAgentId(), thisJob.getStepPath(),
@@ -128,7 +131,7 @@ public class ItemProxy extends EntityProxy PersistencyException,
ObjectAlreadyExistsException
{
- ((Item)getEntity()).requestAction( agentId,
+ ((Item)getEntity()).requestAction( agentId,
stepPath,
transitionID,
requestData );
@@ -139,9 +142,9 @@ public class ItemProxy extends EntityProxy **************************************************************************/
public String queryLifeCycle( int agentId,
boolean filter
- )
- throws AccessRightsException,
- ObjectNotFoundException,
+ )
+ throws AccessRightsException,
+ ObjectNotFoundException,
PersistencyException
{
return ((Item)getEntity()).queryLifeCycle( agentId,
@@ -168,15 +171,15 @@ public class ItemProxy extends EntityProxy }
return thisJobList.list;
}
-
+
public ArrayList<Job> getJobList(AgentProxy agent)
throws AccessRightsException,
ObjectNotFoundException,
PersistencyException
{
- return getJobList(agent.getSystemKey());
+ return getJobList(agent.getSystemKey());
}
-
+
private ArrayList<Job> getJobList(int agentId)
throws AccessRightsException,
ObjectNotFoundException,
@@ -189,7 +192,7 @@ public class ItemProxy extends EntityProxy throws AccessRightsException,
ObjectNotFoundException,
PersistencyException {
-
+
ArrayList<Job> jobList = getJobList(agentId);
for (Job job : jobList) {
int transition = job.getPossibleTransition();
@@ -200,7 +203,7 @@ public class ItemProxy extends EntityProxy return null;
}
-
+
public Job getJobByName(String actName, AgentProxy agent)
throws AccessRightsException,
ObjectNotFoundException,
diff --git a/source/com/c2kernel/entity/proxy/MemberControl.java b/source/com/c2kernel/entity/proxy/MemberControl.java deleted file mode 100755 index 5f483ae..0000000 --- a/source/com/c2kernel/entity/proxy/MemberControl.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.c2kernel.entity.proxy;
-
-import com.c2kernel.entity.C2KLocalObject;
-
-
-/**
- * @version $Revision: 1.2 $ $Date: 2004/02/04 11:02:44 $
- * @author $Author: abranson $
- */
-
-public class MemberControl implements C2KLocalObject {
-
- public static final int ERROR = -1;
- public static final int END = -2;
- public static MemberControl theEND = new MemberControl(END, null);
- private int id;
- private String name;
-
- public MemberControl(int code, String msg) {
- this.setID(code);
- switch (code) {
- case MemberControl.ERROR:
- this.setName("ERROR: The path "+msg+" was not found.");
- break;
- case MemberControl.END:
- this.setName("END: End of preload");
- break;
- default:
- this.setName("Unsupported control message code: "+code);
- }
- }
-
- public String toString() {
- return "MemberControl: "+this.getName();
- }
-
- public int getID() { return id; }
- public void setID(int id) { this.id = id; }
- public String getName() { return name; }
- public void setName(String name) { this.name = name; }
- public String getClusterType() { return null; }
-
-}
diff --git a/source/com/c2kernel/entity/proxy/MemberSubscription.java b/source/com/c2kernel/entity/proxy/MemberSubscription.java index ba2d725..157297f 100644 --- a/source/com/c2kernel/entity/proxy/MemberSubscription.java +++ b/source/com/c2kernel/entity/proxy/MemberSubscription.java @@ -8,31 +8,33 @@ import com.c2kernel.common.ObjectNotFoundException; import com.c2kernel.entity.C2KLocalObject;
import com.c2kernel.utils.Logger;
-public class MemberSubscription implements Runnable {
+public class MemberSubscription<C extends C2KLocalObject> implements Runnable {
+ public static final String ERROR = "Error";
+ public static final String END = "theEND";
+
EntityProxy subject;
String interest;
// keep the subscriber by weak reference, so it is not kept from the garbage collector if no longer used
- WeakReference<EntityProxyObserver> observerReference;
+ WeakReference<EntityProxyObserver<C>> observerReference;
ArrayList<String> contents = new ArrayList<String>();
boolean preLoad;
- public MemberSubscription(EntityProxy subject, String interest,
- EntityProxyObserver observer, boolean preLoad) {
+ public MemberSubscription(EntityProxyObserver<C> observer, String interest, boolean preLoad) {
setObserver(observer);
this.interest = interest;
- this.subject = subject;
this.preLoad = preLoad;
}
- public void run() {
+ @Override
+ public void run() {
Thread.currentThread().setName("Member Subscription: "+subject.getSystemKey()+":"+interest);
if (preLoad) loadChildren();
}
-
+
private void loadChildren() {
- C2KLocalObject newMember;
- EntityProxyObserver observer = getObserver();
- if (observer == null) return; //reaped
+ C newMember;
+ EntityProxyObserver<C> observer = getObserver();
+ if (observer == null) return; //reaped
try {
// fetch contents of path
String children = subject.queryData(interest+"/all");
@@ -40,22 +42,17 @@ public class MemberSubscription implements Runnable { ArrayList<String> newContents = new ArrayList<String>();
while (tok.hasMoreTokens())
newContents.add(tok.nextToken());
-
+
// look to see what's new
for (String newChild: newContents) {
-
+
// load child object
try {
- newMember = subject.getObject(interest+"/"+newChild);
+ newMember = (C)subject.getObject(interest+"/"+newChild);
contents.remove(newChild);
- } catch (ObjectNotFoundException ex) {
- newMember = new MemberControl(MemberControl.ERROR, "Listed member "+newChild+" was not found.");
- }
- try {
observer.add(newMember);
- } catch (Throwable ex) {
- Logger.error("Error publishing member to "+observer);
- Logger.error(ex);
+ } catch (ObjectNotFoundException ex) {
+ observer.control(ERROR, "Listed member "+newChild+" was not found.");
}
}
// report what's left in old contents as deleted
@@ -65,24 +62,24 @@ public class MemberSubscription implements Runnable { //replace contents arraylist
contents = newContents;
//report that we're done
- observer.add(MemberControl.theEND);
+ observer.control(END, null);
} catch (Exception ex) {
- observer.add(new MemberControl(MemberControl.ERROR, "Query on "+interest+" failed with "+ex));
- }
+ observer.control(ERROR, "Query on "+interest+" failed with "+ex);
+ }
}
-
+
public boolean isRelevant(String path) {
Logger.msg(7, "Checking relevance of "+path+" to "+interest);
return (path.startsWith(interest));
}
-
+
public void update(String path, boolean deleted) {
- EntityProxyObserver observer = getObserver();
+ EntityProxyObserver<C> observer = getObserver();
if (observer == null) return; //reaped
Logger.msg(7, "Processing proxy message path "+path +" for "+observer+". Interest: "+interest+" Was Deleted:"+deleted);
if (!path.startsWith(interest)) // doesn't concern us
return;
-
+
if (path.equals(interest)) // refresh contents
loadChildren();
else {
@@ -92,9 +89,9 @@ public class MemberSubscription implements Runnable { contents.remove(name);
observer.remove(name);
}
- else {
+ else {
try {
- C2KLocalObject newMember = subject.getObject(path);
+ C newMember = (C)subject.getObject(path);
Logger.msg(4, "Adding "+path);
contents.add(name);
observer.add(newMember);
@@ -102,17 +99,20 @@ public class MemberSubscription implements Runnable { Logger.error("Member Subscription: could not load "+path);
Logger.error(e);
}
- }
+ }
}
}
-
- public void setObserver(EntityProxyObserver observer) {
- observerReference = new WeakReference<EntityProxyObserver>(observer);
+
+ public void setObserver(EntityProxyObserver<C> observer) {
+ observerReference = new WeakReference<EntityProxyObserver<C>>(observer);
}
- public EntityProxyObserver getObserver() {
- EntityProxyObserver observer = (EntityProxyObserver)observerReference.get();
- return observer;
+ public void setSubject(EntityProxy subject) {
+ this.subject = subject;
+ }
+
+ public EntityProxyObserver<C> getObserver() {
+ return observerReference.get();
}
}
diff --git a/source/com/c2kernel/entity/proxy/ProxyClientConnection.java b/source/com/c2kernel/entity/proxy/ProxyClientConnection.java index f041012..9687f22 100644 --- a/source/com/c2kernel/entity/proxy/ProxyClientConnection.java +++ b/source/com/c2kernel/entity/proxy/ProxyClientConnection.java @@ -41,17 +41,20 @@ public class ProxyClientConnection implements SocketHandler { }
- public String getName() {
+ @Override
+ public String getName() {
return "Proxy Client Connection";
}
- public boolean isBusy() {
+ @Override
+ public boolean isBusy() {
return clientSocket != null;
}
-
- public synchronized void setSocket(Socket newSocket) {
+
+ @Override
+ public synchronized void setSocket(Socket newSocket) {
try {
- Logger.msg(1, "Proxy Client Connection "+thisClientId+" connect from "+newSocket.getInetAddress()+":"+newSocket.getPort());
+ Logger.msg(1, "Proxy Client Connection "+thisClientId+" connect from "+newSocket.getInetAddress()+":"+newSocket.getPort());
newSocket.setSoTimeout(500);
clientSocket = newSocket;
response = new PrintWriter(clientSocket.getOutputStream(), true);
@@ -66,11 +69,12 @@ public class ProxyClientConnection implements SocketHandler { closeSocket();
}
}
-
+
/**
* Main loop. Reads proxy commands from the client and acts on them.
*/
- public void run() {
+ @Override
+ public void run() {
Thread.currentThread().setName("Proxy Client Connection: "+clientSocket.getInetAddress());
Logger.msg(7, "ProxyClientConnection "+thisClientId+" - Setting up proxy client connection with "+clientSocket.getInetAddress());
try {
@@ -87,7 +91,7 @@ public class ProxyClientConnection implements SocketHandler { } catch (InvalidDataException ex) { // invalid proxy message
Logger.error("ProxyClientConnection "+thisClientId+" - Invalid proxy message: "+input);
}
-
+
}
} catch (IOException ex) {
if (!closing)
@@ -96,68 +100,70 @@ public class ProxyClientConnection implements SocketHandler { closeSocket();
Logger.msg(1, "ProxyClientConnection "+thisClientId+" closed.");
}
-
+
private void processMessage(ProxyMessage message) throws InvalidDataException {
-
+
// proxy disconnection
if (message.getPath().equals(ProxyMessage.BYEPATH)) {
Logger.msg(7, "ProxyClientConnection "+thisClientId+" disconnecting");
closeSocket();
}
-
+
// proxy checking connection
- else if (message.getPath().equals(ProxyMessage.PINGPATH))
+ else if (message.getPath().equals(ProxyMessage.PINGPATH))
response.println(ProxyMessage.pingMessage);
-
+
// new subscription to entity changes
else if (message.getPath().equals(ProxyMessage.ADDPATH)) {
Logger.msg(7, "ProxyClientConnection "+thisClientId+" subscribed to "+message.getSysKey());
synchronized (sysKeys) {
sysKeys.add(new Integer(message.getSysKey()));
- }
+ }
}
-
- // remove of subscription to entity changes
+
+ // remove of subscription to entity changes
else if (message.getPath().equals(ProxyMessage.DELPATH)) {
synchronized (sysKeys) {
sysKeys.remove(new Integer(message.getSysKey()));
- }
+ }
Logger.msg(7, "ProxyClientConnection "+thisClientId+" unsubscribed from "+message.getSysKey());
}
-
+
else // unknown message
- Logger.error("ProxyClientConnection "+thisClientId+" - Unknown message type: "+message);
-
- }
-
+ Logger.error("ProxyClientConnection "+thisClientId+" - Unknown message type: "+message);
+
+ }
+
public synchronized void sendMessage(ProxyMessage message) {
- if (clientSocket==null) return; // idle
- boolean relevant = message.getSysKey() == ProxyMessage.NA;
+ if (clientSocket==null) return; // idle
+ boolean relevant = message.getSysKey() == ProxyMessage.NA;
synchronized (sysKeys) {
- for (Iterator iter = sysKeys.iterator(); iter.hasNext() && !relevant;) {
- Integer thisKey = (Integer)iter.next();
+ for (Iterator<Integer> iter = sysKeys.iterator(); iter.hasNext() && !relevant;) {
+ Integer thisKey = iter.next();
if (thisKey.intValue() == message.getSysKey())
relevant = true;
}
}
if (!relevant) return; // not for our client
-
+
response.println(message);
}
-
- public void shutdown() {
+
+ @Override
+ public void shutdown() {
if (isBusy()) {
closing = true;
Logger.msg("ProxyClientConnection "+thisClientId+" closing.");
closeSocket();
}
}
-
- public String toString() {
+
+ @Override
+ public String toString() {
if (clientSocket == null) return thisClientId+": idle";
else return thisClientId+": "+clientSocket.getInetAddress();
}
-
+
private synchronized void closeSocket() {
if (clientSocket==null) return;
try {
@@ -171,9 +177,9 @@ public class ProxyClientConnection implements SocketHandler { synchronized (sysKeys) {
sysKeys = null;
}
-
+
clientSocket = null;
}
-
+
}
diff --git a/source/com/c2kernel/entity/proxy/ProxyMessage.java b/source/com/c2kernel/entity/proxy/ProxyMessage.java index 66f1f34..b312a44 100755..100644 --- a/source/com/c2kernel/entity/proxy/ProxyMessage.java +++ b/source/com/c2kernel/entity/proxy/ProxyMessage.java @@ -18,24 +18,24 @@ import com.c2kernel.common.InvalidDataException; **************************************************************************/
public class ProxyMessage {
-
+
// special server message paths
- public static final String BYEPATH = "bye";
+ public static final String BYEPATH = "bye";
public static final String ADDPATH = "add";
public static final String DELPATH = "del";
public static final String PINGPATH = "ping";
public static final boolean ADDED = false;
public static final boolean DELETED = true;
public static final int NA = -1;
-
- static ProxyMessage byeMessage = new ProxyMessage(NA, BYEPATH, ADDED);
- static ProxyMessage pingMessage = new ProxyMessage(NA, PINGPATH, ADDED);
-
+
+ static ProxyMessage byeMessage = new ProxyMessage(NA, BYEPATH, ADDED);
+ static ProxyMessage pingMessage = new ProxyMessage(NA, PINGPATH, ADDED);
+
private int sysKey = NA;
private String path = "";
private String server = null;
private boolean state = ADDED;
-
+
public ProxyMessage() {
super();
}
@@ -45,7 +45,7 @@ public class ProxyMessage { setPath(path);
setState(state);
}
-
+
public ProxyMessage(String line) throws InvalidDataException, IOException {
if (line == null)
throw new IOException("Null proxy message");
@@ -56,10 +56,10 @@ public class ProxyMessage { path = tok.nextToken();
if (path.startsWith("-")) {
state = DELETED;
- path = path.substring(1);
+ path = path.substring(1);
}
}
-
+
public ProxyMessage(DatagramPacket packet) throws InvalidDataException, IOException {
this(new String(packet.getData()));
}
@@ -67,7 +67,7 @@ public class ProxyMessage { public int getSysKey() {
return sysKey;
}
-
+
public void setSysKey(int sysKey) {
this.sysKey = sysKey;
}
@@ -75,30 +75,31 @@ public class ProxyMessage { public String getPath() {
return path;
}
-
+
public void setPath(String newPath) {
this.path = newPath;
}
-
+
public boolean getState() {
return state;
}
-
+
public void setState(boolean state) {
this.state = state;
}
-
- public String toString() {
- return sysKey+":"+(state?"-":"")+path;
+
+ @Override
+ public String toString() {
+ return sysKey+":"+(state?"-":"")+path;
}
public DatagramPacket getPacket(ProxySubscriber host) {
return getPacket(host.getHost(), host.getPort());
}
-
+
public DatagramPacket getPacket(InetAddress host, int port) {
byte[] packetString = toString().getBytes();
return new DatagramPacket(packetString, packetString.length, host, port);
- }
+ }
public String getServer() {
return server;
diff --git a/source/com/c2kernel/entity/proxy/ProxyServerConnection.java b/source/com/c2kernel/entity/proxy/ProxyServerConnection.java index 191492f..6807953 100755..100644 --- a/source/com/c2kernel/entity/proxy/ProxyServerConnection.java +++ b/source/com/c2kernel/entity/proxy/ProxyServerConnection.java @@ -10,7 +10,11 @@ package com.c2kernel.entity.proxy;
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.InterruptedIOException;
+import java.io.PrintWriter;
import java.net.Socket;
import com.c2kernel.common.InvalidDataException;
@@ -30,7 +34,7 @@ public class ProxyServerConnection extends Thread PrintWriter serverStream;
boolean listening = false;
static boolean isServer = false;
-
+
/**
* Create an entity proxy manager to listen for proxy events and reap unused proxies
*/
@@ -43,8 +47,9 @@ public class ProxyServerConnection extends Thread listening = true;
start();
}
-
- public void run() {
+
+ @Override
+ public void run() {
Thread.currentThread().setName("Proxy Client Connection Listener to "+serverName+":"+serverPort);
while (listening) {
try {
@@ -72,14 +77,14 @@ public class ProxyServerConnection extends Thread try {
serverStream.close();
serverConnection.close();
- } catch (IOException e1) { }
+ } catch (IOException e1) { }
+
-
serverStream = null;
serverConnection = null;
}
}
-
+
if (serverStream != null) {
try {
Logger.msg(1, "Disconnecting from proxy server on "+serverName+":"+serverPort);
@@ -90,9 +95,9 @@ public class ProxyServerConnection extends Thread } catch (Exception e) {
Logger.error("Error disconnecting from proxy server.");
}
- }
+ }
}
-
+
public void connect() {
Logger.msg(3, "ProxyServerConnection - connecting to proxy server on "+serverName+":"+serverPort);
try {
@@ -111,12 +116,12 @@ public class ProxyServerConnection extends Thread serverIsActive = false;
}
}
-
+
public void shutdown() {
Logger.msg("Proxy Client: flagging shutdown.");
listening = false;
}
-
+
/**
* @param sub
*/
diff --git a/source/com/c2kernel/entity/proxy/ProxySubscriber.java b/source/com/c2kernel/entity/proxy/ProxySubscriber.java index 67d0a60..8cb85fa 100755..100644 --- a/source/com/c2kernel/entity/proxy/ProxySubscriber.java +++ b/source/com/c2kernel/entity/proxy/ProxySubscriber.java @@ -17,9 +17,9 @@ import com.c2kernel.common.InvalidDataException; public class ProxySubscriber {
- private InetAddress host;
+ private InetAddress host;
private int port;
- public ArrayList sysKeys = new ArrayList();
+ public ArrayList<?> sysKeys = new ArrayList<Object>();
public ProxySubscriber(DatagramPacket packet) throws InvalidDataException {
host = packet.getAddress();
|
