From a1f0ecbb6a2bea6aa214322c412af2f3c5ce124b Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Wed, 7 May 2014 17:33:13 +0200 Subject: Agent now extends Item, so they can have workflows. All traces of the old 'Entity' superclasses should be removed, including proxies and paths. Very large change, breaks API compatibility with CRISTAL 2.x. Fixes #135 --- .../java/com/c2kernel/entity/proxy/ItemProxy.java | 285 +++++++++++++++++---- 1 file changed, 232 insertions(+), 53 deletions(-) (limited to 'src/main/java/com/c2kernel/entity/proxy/ItemProxy.java') diff --git a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java index 0e6859d..355acd8 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java +++ b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java @@ -10,25 +10,40 @@ package com.c2kernel.entity.proxy; +import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; + +import org.exolab.castor.mapping.MappingException; +import org.exolab.castor.xml.MarshalException; +import org.exolab.castor.xml.ValidationException; import com.c2kernel.collection.Collection; -import com.c2kernel.collection.CollectionMember; +import com.c2kernel.collection.CollectionArrayList; import com.c2kernel.common.AccessRightsException; import com.c2kernel.common.InvalidDataException; import com.c2kernel.common.InvalidTransitionException; import com.c2kernel.common.ObjectAlreadyExistsException; import com.c2kernel.common.ObjectNotFoundException; import com.c2kernel.common.PersistencyException; +import com.c2kernel.entity.C2KLocalObject; import com.c2kernel.entity.Item; import com.c2kernel.entity.ItemHelper; -import com.c2kernel.entity.ManageableEntity; import com.c2kernel.entity.agent.Job; import com.c2kernel.entity.agent.JobArrayList; +import com.c2kernel.lifecycle.instance.CompositeActivity; import com.c2kernel.lifecycle.instance.Workflow; +import com.c2kernel.lookup.InvalidItemPathException; +import com.c2kernel.lookup.ItemPath; +import com.c2kernel.lookup.Path; import com.c2kernel.persistency.ClusterStorage; +import com.c2kernel.persistency.ClusterStorageException; import com.c2kernel.persistency.outcome.Viewpoint; import com.c2kernel.process.Gateway; +import com.c2kernel.property.Property; +import com.c2kernel.property.PropertyArrayList; +import com.c2kernel.utils.CastorXMLUtility; import com.c2kernel.utils.Logger; /****************************************************************************** @@ -38,43 +53,86 @@ import com.c2kernel.utils.Logger; * @version $Revision: 1.25 $ $Date: 2005/05/10 11:40:09 $ * @author $Author: abranson $ ******************************************************************************/ -public class ItemProxy extends EntityProxy +public class ItemProxy { + protected Item mItem = null; + protected org.omg.CORBA.Object mIOR; + protected int mSystemKey; + protected Path mPath; + private final HashMap, ProxyObserver> + mSubscriptions; + /************************************************************************** - * + * **************************************************************************/ protected ItemProxy( org.omg.CORBA.Object ior, int systemKey) throws ObjectNotFoundException { - super(ior, systemKey); + Logger.msg(8, "ItemProxy::initialise() - Initialising entity " +systemKey); + + mIOR = ior; + mSystemKey = systemKey; + mSubscriptions = new HashMap, ProxyObserver>(); + try { + mPath = new ItemPath(systemKey); + } catch (InvalidItemPathException e) { + throw new ObjectNotFoundException(); + } } + + public int getSystemKey() + { + return mSystemKey; + } + + public Path getPath() { + return mPath; + } + + protected Item getItem() throws ObjectNotFoundException { + if (mItem == null) + mItem = narrow(); + return mItem; + } - @Override - public ManageableEntity narrow() throws ObjectNotFoundException + public Item narrow() throws ObjectNotFoundException { try { return ItemHelper.narrow(mIOR); } catch (org.omg.CORBA.BAD_PARAM ex) { } throw new ObjectNotFoundException("CORBA Object was not an Item, or the server is down."); } - /************************************************************************** + /** + * @throws MappingException + * @throws IOException + * @throws ValidationException + * @throws MarshalException ************************************************************************ * * **************************************************************************/ - public void initialise( int agentId, - String itemProps, - String workflow ) + public void initialise( int agentId, + PropertyArrayList itemProps, + CompositeActivity workflow, + CollectionArrayList colls + ) throws AccessRightsException, InvalidDataException, PersistencyException, - ObjectNotFoundException + ObjectNotFoundException, MarshalException, ValidationException, IOException, MappingException { Logger.msg(7, "ItemProxy::initialise - started"); - - ((Item)getEntity()).initialise( agentId, itemProps, workflow ); + CastorXMLUtility xml = Gateway.getMarshaller(); + if (itemProps == null) throw new InvalidDataException("No initial properties supplied"); + String propString = xml.marshall(itemProps); + String wfString = ""; + if (workflow != null) wfString = xml.marshall(workflow); + String collString = ""; + if (colls != null) collString = xml.marshall(colls); + + getItem().initialise( agentId, propString, wfString, collString); } public void setProperty(AgentProxy agent, String name, String value) @@ -100,7 +158,7 @@ public class ItemProxy extends EntityProxy /************************************************************************** * **************************************************************************/ - protected void requestAction( Job thisJob ) + public void requestAction( Job thisJob ) throws AccessRightsException, InvalidTransitionException, ObjectNotFoundException, @@ -120,44 +178,10 @@ public class ItemProxy extends EntityProxy throw new InvalidDataException("No Agent specified.", ""); Logger.msg(7, "ItemProxy - executing "+thisJob.getStepPath()+" for "+thisJob.getAgentName()); - requestAction (thisJob.getAgentId(), thisJob.getStepPath(), + getItem().requestAction (thisJob.getAgentId(), thisJob.getStepPath(), thisJob.getTransition().getId(), outcome); } - //requestData is xmlString - public void requestAction( int agentId, - String stepPath, - int transitionID, - String requestData - ) - throws AccessRightsException, - InvalidTransitionException, - ObjectNotFoundException, - InvalidDataException, - PersistencyException, - ObjectAlreadyExistsException - { - ((Item)getEntity()).requestAction( agentId, - stepPath, - transitionID, - requestData ); - } - - /************************************************************************** - * - **************************************************************************/ - public String queryLifeCycle( int agentId, - boolean filter - ) - throws AccessRightsException, - ObjectNotFoundException, - PersistencyException - { - return ((Item)getEntity()).queryLifeCycle( agentId, - filter ); - } - - /************************************************************************** * **************************************************************************/ @@ -168,7 +192,7 @@ public class ItemProxy extends EntityProxy { JobArrayList thisJobList; try { - String jobs = queryLifeCycle(agentId, filter); + String jobs = getItem().queryLifeCycle(agentId, filter); thisJobList = (JobArrayList)Gateway.getMarshaller().unmarshall(jobs); } catch (Exception e) { @@ -208,8 +232,8 @@ public class ItemProxy extends EntityProxy } - public Collection getCollection(String collName) throws ObjectNotFoundException { - return (Collection)getObject(ClusterStorage.COLLECTION+"/"+collName); + public Collection getCollection(String collName) throws ObjectNotFoundException { + return (Collection)getObject(ClusterStorage.COLLECTION+"/"+collName); } public Workflow getWorkflow() throws ObjectNotFoundException { @@ -226,4 +250,159 @@ public class ItemProxy extends EntityProxy PersistencyException { return getJobByName(actName, agent.getSystemKey()); } + + /** + * If this is reaped, clear out the cache for it too. + */ + @Override + protected void finalize() throws Throwable { + Logger.msg(7, "Proxy "+mSystemKey+" reaped"); + Gateway.getStorage().clearCache(mSystemKey, null); + Gateway.getProxyManager().removeProxy(mSystemKey); + super.finalize(); + } + + /************************************************************************** + * + **************************************************************************/ + public String queryData( String path ) + throws ObjectNotFoundException + { + + try { + Logger.msg(7, "EntityProxy.queryData() - "+mSystemKey+"/"+path); + if (path.endsWith("all")) { + Logger.msg(7, "EntityProxy.queryData() - listing contents"); + String[] result = Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length()-3)); + StringBuffer retString = new StringBuffer(); + for (int i = 0; i < result.length; i++) { + retString.append(result[i]); + if (i"+e.getMessage()+""; + } + } + + public String[] getContents( String path ) throws ObjectNotFoundException { + try { + return Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length())); + } catch (ClusterStorageException e) { + throw new ObjectNotFoundException(e.toString()); + } + } + + + /************************************************************************** + * + **************************************************************************/ + public C2KLocalObject getObject( String xpath ) + throws ObjectNotFoundException + { + // load from storage, falling back to proxy loader if not found in others + try + { + return Gateway.getStorage().get( mSystemKey, xpath , null); + } + catch( ClusterStorageException ex ) + { + Logger.msg(4, "Exception loading object :"+mSystemKey+"/"+xpath); + throw new ObjectNotFoundException( ex.toString() ); + } + } + + + + public String getProperty( String name ) + throws ObjectNotFoundException + { + Logger.msg(5, "Get property "+name+" from syskey/"+mSystemKey); + Property prop = (Property)getObject("Property/"+name); + try + { + return prop.getValue(); + } + catch (NullPointerException ex) + { + throw new ObjectNotFoundException(); + } + } + + public String getName() + { + try { + return getProperty("Name"); + } catch (ObjectNotFoundException ex) { + return null; + } + } + + + + + /************************************************************************** + * Subscription methods + **************************************************************************/ + + public void subscribe(MemberSubscription newSub) { + + newSub.setSubject(this); + synchronized (this){ + mSubscriptions.put( newSub, newSub.getObserver() ); + } + new Thread(newSub).start(); + Logger.msg(7, "Subscribed "+newSub.getObserver().getClass().getName()+" for "+newSub.interest); + } + + public void unsubscribe(ProxyObserver observer) + { + synchronized (this){ + for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { + MemberSubscription thisSub = e.next(); + if (mSubscriptions.get( thisSub ) == observer) { + e.remove(); + Logger.msg(7, "Unsubscribed "+observer.getClass().getName()); + } + } + } + } + + public void dumpSubscriptions(int logLevel) { + if (mSubscriptions.size() == 0) return; + Logger.msg(logLevel, "Subscriptions to proxy "+mSystemKey+":"); + synchronized(this) { + for (MemberSubscription element : mSubscriptions.keySet()) { + ProxyObserver obs = element.getObserver(); + if (obs != null) + Logger.msg(logLevel, " "+element.getObserver().getClass().getName()+" subscribed to "+element.interest); + else + Logger.msg(logLevel, " Phantom subscription to "+element.interest); + } + } + } + + public void notify(ProxyMessage message) { + Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mSystemKey); + synchronized (this){ + if (!message.getServer().equals(ProxyManager.serverName)) + Gateway.getStorage().clearCache(mSystemKey, message.getPath()); + for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { + MemberSubscription newSub = e.next(); + if (newSub.getObserver() == null) { // phantom + Logger.msg(4, "Removing phantom subscription to "+newSub.interest); + e.remove(); + } + else + newSub.update(message.getPath(), message.getState()); + } + } + } } -- cgit v1.2.3 From 40ef76037aeac4ee2b9d857a092e5ea026c0bb5c Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Wed, 14 May 2014 13:23:26 +0200 Subject: Refactored Proxy update notification server into its own class and thread, so executions return before proxy messages are sent. Another deadlock suspect. --- .../java/com/c2kernel/entity/proxy/ItemProxy.java | 2 +- .../entity/proxy/ProxyClientConnection.java | 3 +- .../com/c2kernel/entity/proxy/ProxyManager.java | 72 +------------- .../com/c2kernel/entity/proxy/ProxyServer.java | 106 +++++++++++++++++++++ src/main/java/com/c2kernel/lookup/LDAPLookup.java | 5 +- .../persistency/ClusterStorageManager.java | 5 +- src/main/java/com/c2kernel/process/Gateway.java | 21 ++-- 7 files changed, 129 insertions(+), 85 deletions(-) create mode 100644 src/main/java/com/c2kernel/entity/proxy/ProxyServer.java (limited to 'src/main/java/com/c2kernel/entity/proxy/ItemProxy.java') diff --git a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java index 355acd8..f3a2f44 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java +++ b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java @@ -392,7 +392,7 @@ public class ItemProxy public void notify(ProxyMessage message) { Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mSystemKey); synchronized (this){ - if (!message.getServer().equals(ProxyManager.serverName)) + if (Gateway.getProxyServer()== null || !message.getServer().equals(Gateway.getProxyServer().getServerName())) Gateway.getStorage().clearCache(mSystemKey, message.getPath()); for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { MemberSubscription newSub = e.next(); diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java index 5abdb16..3a7e129 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java @@ -11,6 +11,7 @@ import java.util.ArrayList; import java.util.Iterator; import com.c2kernel.common.InvalidDataException; +import com.c2kernel.process.Gateway; import com.c2kernel.utils.Logger; import com.c2kernel.utils.server.SocketHandler; @@ -36,7 +37,7 @@ public class ProxyClientConnection implements SocketHandler { public ProxyClientConnection() { super(); thisClientId = ++clientId; - ProxyManager.registerProxyClient(this); + Gateway.getProxyServer().registerProxyClient(this); Logger.msg(1, "Proxy Client Connection Handler "+thisClientId+" ready."); } diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java b/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java index d19e38f..b217f3e 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java @@ -26,7 +26,6 @@ import com.c2kernel.process.Gateway; import com.c2kernel.property.Property; import com.c2kernel.utils.Logger; import com.c2kernel.utils.SoftCache; -import com.c2kernel.utils.server.SimpleTCPIPServer; public class ProxyManager @@ -35,11 +34,6 @@ public class ProxyManager HashMap treeSubscribers = new HashMap(); HashMap connections = new HashMap(); - // server objects - static ArrayList proxyClients = new ArrayList(); - static SimpleTCPIPServer proxyServer = null; - static String serverName = null; - /** * Create a proxy manager to listen for proxy events and reap unused proxies */ @@ -268,70 +262,6 @@ public class ProxyManager } - /************************************************************************** - * Static Proxy Server methods - **************************************************************************/ - - /** - * Initialises the Proxy event UDP server listening on 'Host.Proxy.port' from c2kprops - * @param c2kProps - */ - public static void initServer() - { - Logger.msg(5, "ProxyManager::initServer - Starting....."); - int port = Gateway.getProperties().getInt("ItemServer.Proxy.port", 0); - serverName = Gateway.getProperties().getProperty("ItemServer.name"); - if (port == 0) { - Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of changes."); - return; - } - - // set up the proxy server - try { - Logger.msg(5, "ProxyManager::initServer - Initialising proxy informer on port "+port); - proxyServer = new SimpleTCPIPServer(port, ProxyClientConnection.class, 200); - proxyServer.startListening(); - } catch (Exception ex) { - Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of changes."); - Logger.error(ex); - } - } - - public static void sendProxyEvent(ProxyMessage message) { - if (proxyServer != null && message.getPath() != null) - synchronized(proxyClients) { - for (ProxyClientConnection client : proxyClients) { - client.sendMessage(message); - } - } - } - - public static void reportConnections(int logLevel) { - synchronized(proxyClients) { - Logger.msg(logLevel, "Currently connected proxy clients:"); - for (ProxyClientConnection client : proxyClients) { - Logger.msg(logLevel, " "+client); - } - } - } - - public static void shutdownServer() { - if (proxyServer != null) { - Logger.msg(1, "ProxyManager: Closing Server."); - proxyServer.stopListening(); - } - } - - public static void registerProxyClient(ProxyClientConnection client) { - synchronized(proxyClients) { - proxyClients.add(client); - } - } - - public static void unRegisterProxyClient(ProxyClientConnection client) { - synchronized(proxyClients) { - proxyClients.remove(client); - } - } + } diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyServer.java b/src/main/java/com/c2kernel/entity/proxy/ProxyServer.java new file mode 100644 index 0000000..c576cda --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyServer.java @@ -0,0 +1,106 @@ +package com.c2kernel.entity.proxy; + +import java.util.ArrayList; +import java.util.concurrent.LinkedBlockingQueue; + +import com.c2kernel.process.Gateway; +import com.c2kernel.utils.Logger; +import com.c2kernel.utils.server.SimpleTCPIPServer; + +public class ProxyServer implements Runnable { + + // server objects + ArrayList proxyClients; + SimpleTCPIPServer proxyListener = null; + String serverName = null; + boolean keepRunning = true; + LinkedBlockingQueue messageQueue; + + public ProxyServer(String serverName) { + Logger.msg(5, "ProxyManager::initServer - Starting....."); + int port = Gateway.getProperties().getInt("ItemServer.Proxy.port", 0); + this.serverName = serverName; + this.proxyClients = new ArrayList(); + this.messageQueue = new LinkedBlockingQueue(); + + if (port == 0) { + Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of changes."); + return; + } + + // set up the proxy server + try { + Logger.msg(5, "ProxyManager::initServer - Initialising proxy informer on port "+port); + proxyListener = new SimpleTCPIPServer(port, ProxyClientConnection.class, 200); + proxyListener.startListening(); + } catch (Exception ex) { + Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of changes."); + Logger.error(ex); + } + // start the message queue delivery thread + new Thread(this).start(); + } + + @Override + public void run() { + + while(keepRunning) { + ProxyMessage message = messageQueue.poll(); + if (message != null) { + synchronized(proxyClients) { + for (ProxyClientConnection client : proxyClients) { + client.sendMessage(message); + } + } + } else + try { + synchronized(this) { wait(); } + } catch (InterruptedException e) { } + } + + } + + public String getServerName() { + return serverName; + } + + public void sendProxyEvent(ProxyMessage message) { + try { + synchronized(this) { + messageQueue.put(message); + notify(); + } + } catch (InterruptedException e) { } + } + + public void reportConnections(int logLevel) { + synchronized(proxyClients) { + Logger.msg(logLevel, "Currently connected proxy clients:"); + for (ProxyClientConnection client : proxyClients) { + Logger.msg(logLevel, " "+client); + } + } + } + + public void shutdownServer() { + Logger.msg(1, "ProxyManager: Closing Server."); + proxyListener.stopListening(); + synchronized(this) { + keepRunning = false; + notify(); + } + } + + public void registerProxyClient(ProxyClientConnection client) { + synchronized(proxyClients) { + proxyClients.add(client); + } + } + + public void unRegisterProxyClient(ProxyClientConnection client) { + synchronized(proxyClients) { + proxyClients.remove(client); + } + } + +} diff --git a/src/main/java/com/c2kernel/lookup/LDAPLookup.java b/src/main/java/com/c2kernel/lookup/LDAPLookup.java index 116362e..eae803b 100644 --- a/src/main/java/com/c2kernel/lookup/LDAPLookup.java +++ b/src/main/java/com/c2kernel/lookup/LDAPLookup.java @@ -12,7 +12,6 @@ import com.c2kernel.common.ObjectCannotBeUpdated; import com.c2kernel.common.ObjectNotFoundException; import com.c2kernel.entity.TraceableEntity; import com.c2kernel.entity.agent.ActiveEntity; -import com.c2kernel.entity.proxy.ProxyManager; import com.c2kernel.entity.proxy.ProxyMessage; import com.c2kernel.process.Gateway; import com.c2kernel.property.PropertyDescription; @@ -231,7 +230,7 @@ public class LDAPLookup LDAPEntry newEntry = new LDAPEntry(path.getFullDN(),attrSet); LDAPLookupUtils.addEntry(getConnection(),newEntry); if (path instanceof DomainPath) - ProxyManager.sendProxyEvent(new ProxyMessage(ProxyMessage.NA, path.toString(), ProxyMessage.ADDED)); + Gateway.getProxyServer().sendProxyEvent(new ProxyMessage(ProxyMessage.NA, path.toString(), ProxyMessage.ADDED)); return newEntry; } catch (LDAPException ex) { if (ex.getResultCode() == LDAPException.ENTRY_ALREADY_EXISTS) @@ -251,7 +250,7 @@ public class LDAPLookup throw new ObjectCannotBeUpdated(ex.getLDAPErrorMessage(), ""); } if (path instanceof DomainPath) { - ProxyManager.sendProxyEvent(new ProxyMessage(ProxyMessage.NA, path.toString(), ProxyMessage.DELETED)); + Gateway.getProxyServer().sendProxyEvent(new ProxyMessage(ProxyMessage.NA, path.toString(), ProxyMessage.DELETED)); } } diff --git a/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java b/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java index d0c3f77..20857c6 100644 --- a/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java +++ b/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java @@ -10,7 +10,6 @@ import java.util.StringTokenizer; import com.c2kernel.common.ObjectNotFoundException; import com.c2kernel.entity.C2KLocalObject; import com.c2kernel.entity.agent.JobList; -import com.c2kernel.entity.proxy.ProxyManager; import com.c2kernel.entity.proxy.ProxyMessage; import com.c2kernel.events.History; import com.c2kernel.persistency.outcome.Outcome; @@ -291,7 +290,7 @@ public class ClusterStorageManager { if (Logger.doLog(9)) dumpCacheContents(9); // transmit proxy event - ProxyManager.sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.ADDED)); + Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.ADDED)); } /** Deletes a cluster from all writers */ @@ -317,7 +316,7 @@ public class ClusterStorageManager { // transmit proxy event - ProxyManager.sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.DELETED)); + Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.DELETED)); } public void clearCache(Integer sysKeyIntObj, String path) { diff --git a/src/main/java/com/c2kernel/process/Gateway.java b/src/main/java/com/c2kernel/process/Gateway.java index 6c7b68d..01cc202 100644 --- a/src/main/java/com/c2kernel/process/Gateway.java +++ b/src/main/java/com/c2kernel/process/Gateway.java @@ -14,6 +14,7 @@ import com.c2kernel.common.ObjectNotFoundException; import com.c2kernel.entity.CorbaServer; import com.c2kernel.entity.proxy.AgentProxy; import com.c2kernel.entity.proxy.ProxyManager; +import com.c2kernel.entity.proxy.ProxyServer; import com.c2kernel.lookup.AgentPath; import com.c2kernel.lookup.LDAPLookup; import com.c2kernel.lookup.LDAPProperties; @@ -57,7 +58,8 @@ public class Gateway static private boolean orbDestroyed = false; static private LDAPLookup mLDAPLookup; static private TransactionManager mStorage; - static private ProxyManager mProxyManager; + static private ProxyManager mProxyManager; + static private ProxyServer mProxyServer; static private CorbaServer mCorbaServer; static private CastorXMLUtility mMarshaller; static private AgentProxy mCurrentUser = null; @@ -151,13 +153,13 @@ public class Gateway mLDAPLookup.install(); // start entity proxy server - ProxyManager.initServer(); + mProxyServer = new ProxyServer(mC2KProps.getProperty("ItemServer.name")); // Init ORB - set various config - String serverName = getProperty("ItemServer.name"); + String serverName = mC2KProps.getProperty("ItemServer.name"); if (serverName != null) mC2KProps.put("com.sun.CORBA.ORBServerHost", serverName); - String serverPort = getProperty("ItemServer.iiop", "1500"); + String serverPort = mC2KProps.getProperty("ItemServer.iiop", "1500"); mC2KProps.put("com.sun.CORBA.ORBServerPort", serverPort); //TODO: externalize this (or replace corba completely) mC2KProps.put("com.sun.CORBA.POA.ORBServerId", "1"); @@ -356,11 +358,13 @@ public class Gateway mLDAPLookup.disconnect(); mLDAPLookup = null; - // shut down proxy manager + // shut down proxy manager & server + if (mProxyServer != null) + mProxyServer.shutdownServer(); if (mProxyManager != null) mProxyManager.shutdown(); mProxyManager = null; - ProxyManager.shutdownServer(); + // close log consoles Logger.closeConsole(); @@ -410,6 +414,11 @@ public class Gateway return mProxyManager; } + + public static ProxyServer getProxyServer() { + return mProxyServer; + } + static public String getCentreId() { return getProperty("LocalCentre"); } -- cgit v1.2.3 From 62c7a46967e949304e6b242854526463aae7ee17 Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Fri, 16 May 2014 10:38:32 +0200 Subject: item.request returns the final outcome, which may be modified during the execution (e.g. in the case of Predefined Steps). Fixes #136 --- src/main/idl/Entity.idl | 2 +- .../java/com/c2kernel/entity/ItemImplementation.java | 5 +++-- src/main/java/com/c2kernel/entity/TraceableEntity.java | 4 ++-- .../java/com/c2kernel/entity/agent/ActiveEntity.java | 4 ++-- .../java/com/c2kernel/entity/proxy/AgentProxy.java | 18 ++++++++++-------- src/main/java/com/c2kernel/entity/proxy/ItemProxy.java | 4 ++-- .../java/com/c2kernel/lifecycle/instance/Activity.java | 4 +++- .../c2kernel/lifecycle/instance/CompositeActivity.java | 4 ++-- .../java/com/c2kernel/lifecycle/instance/Workflow.java | 4 ++-- 9 files changed, 27 insertions(+), 22 deletions(-) (limited to 'src/main/java/com/c2kernel/entity/proxy/ItemProxy.java') diff --git a/src/main/idl/Entity.idl b/src/main/idl/Entity.idl index 2aa9188..51884a3 100644 --- a/src/main/idl/Entity.idl +++ b/src/main/idl/Entity.idl @@ -101,7 +101,7 @@ module entity * @throws PersistencyException There was a problem committing the changes to storage. * @throws ObjectAlreadyExistsException Not normally thrown, but reserved for PredefinedSteps to throw if they need to. **/ - void requestAction( in unsigned long agentID, + string requestAction( in unsigned long agentID, in string stepPath, in unsigned long transitionID, in string requestData diff --git a/src/main/java/com/c2kernel/entity/ItemImplementation.java b/src/main/java/com/c2kernel/entity/ItemImplementation.java index e0d107a..b12e105 100644 --- a/src/main/java/com/c2kernel/entity/ItemImplementation.java +++ b/src/main/java/com/c2kernel/entity/ItemImplementation.java @@ -113,7 +113,7 @@ public class ItemImplementation implements ItemOperations { @Override - public void requestAction(int agentId, String stepPath, int transitionID, + public String requestAction(int agentId, String stepPath, int transitionID, String requestData) throws AccessRightsException, InvalidTransitionException, ObjectNotFoundException, InvalidDataException, PersistencyException, @@ -128,7 +128,7 @@ public class ItemImplementation implements ItemOperations { Workflow lifeCycle = (Workflow) mStorage.get(mSystemKey, ClusterStorage.LIFECYCLE + "/workflow", null); - lifeCycle.requestAction(agent, stepPath, mSystemKey, + String finalOutcome = lifeCycle.requestAction(agent, stepPath, mSystemKey, transitionID, requestData); // store the workflow if we've changed the state of the domain @@ -136,6 +136,7 @@ public class ItemImplementation implements ItemOperations { if (!(stepPath.startsWith("workflow/predefined"))) mStorage.put(mSystemKey, lifeCycle, null); + return finalOutcome; // Normal operation exceptions } catch (AccessRightsException ex) { Logger.msg("Propagating AccessRightsException back to the calling agent"); diff --git a/src/main/java/com/c2kernel/entity/TraceableEntity.java b/src/main/java/com/c2kernel/entity/TraceableEntity.java index ffd5859..a0980ee 100644 --- a/src/main/java/com/c2kernel/entity/TraceableEntity.java +++ b/src/main/java/com/c2kernel/entity/TraceableEntity.java @@ -111,7 +111,7 @@ public class TraceableEntity extends ItemPOA **************************************************************************/ //requestdata is xmlstring @Override - public void requestAction( int agentId, + public String requestAction( int agentId, String stepPath, int transitionID, String requestData @@ -124,7 +124,7 @@ public class TraceableEntity extends ItemPOA ObjectAlreadyExistsException { synchronized (this) { - mItemImpl.requestAction(agentId, stepPath, transitionID, requestData); + return mItemImpl.requestAction(agentId, stepPath, transitionID, requestData); } } diff --git a/src/main/java/com/c2kernel/entity/agent/ActiveEntity.java b/src/main/java/com/c2kernel/entity/agent/ActiveEntity.java index c59b0fe..a799b62 100644 --- a/src/main/java/com/c2kernel/entity/agent/ActiveEntity.java +++ b/src/main/java/com/c2kernel/entity/agent/ActiveEntity.java @@ -120,14 +120,14 @@ public class ActiveEntity extends AgentPOA } @Override - public void requestAction(int agentID, String stepPath, int transitionID, + public String requestAction(int agentID, String stepPath, int transitionID, String requestData) throws AccessRightsException, InvalidTransitionException, ObjectNotFoundException, InvalidDataException, PersistencyException, ObjectAlreadyExistsException { synchronized (this) { - mAgentImpl.requestAction(agentID, stepPath, transitionID, requestData); + return mAgentImpl.requestAction(agentID, stepPath, transitionID, requestData); } } diff --git a/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java b/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java index 29550d4..b6566a8 100644 --- a/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java +++ b/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java @@ -82,7 +82,7 @@ public class AgentProxy extends ItemProxy * @param job - the job to execute * @throws ScriptErrorException */ - public void execute(ItemProxy item, Job job) + public String execute(ItemProxy item, Job job) throws AccessRightsException, InvalidTransitionException, ObjectNotFoundException, @@ -152,12 +152,14 @@ public class AgentProxy extends ItemProxy job.setAgentId(getSystemKey()); Logger.msg(3, "AgentProxy - submitting job to item proxy"); - item.requestAction(job); + String result = item.requestAction(job); if (Logger.doLog(3)) { Date timeNow = new Date(); long secsNow = (timeNow.getTime()-startTime.getTime())/1000; Logger.msg(3, "Execution took "+secsNow+" seconds"); } + + return result; } private Object callScript(ItemProxy item, Job job) throws ScriptingEngineException { @@ -178,7 +180,7 @@ public class AgentProxy extends ItemProxy * @throws ObjectAlreadyExistsException * @throws ScriptErrorException */ - public void execute(Job job) + public String execute(Job job) throws AccessRightsException, InvalidDataException, InvalidTransitionException, @@ -189,13 +191,13 @@ public class AgentProxy extends ItemProxy { try { ItemProxy targetItem = Gateway.getProxyManager().getProxy(new ItemPath(job.getItemSysKey())); - execute(targetItem, job); + return execute(targetItem, job); } catch (InvalidItemPathException e) { throw new ObjectNotFoundException("Job contained invalid item sysKey: "+job.getItemSysKey(), ""); } } - public void execute(ItemProxy item, String predefStep, C2KLocalObject obj) + public String execute(ItemProxy item, String predefStep, C2KLocalObject obj) throws AccessRightsException, InvalidDataException, InvalidTransitionException, @@ -210,10 +212,10 @@ public class AgentProxy extends ItemProxy Logger.error(ex); throw new InvalidDataException("Error on marshall", ""); } - execute(item, predefStep, param); + return execute(item, predefStep, param); } - public void execute(ItemProxy item, String predefStep, String... params) + public String execute(ItemProxy item, String predefStep, String... params) throws AccessRightsException, InvalidDataException, InvalidTransitionException, @@ -221,7 +223,7 @@ public class AgentProxy extends ItemProxy PersistencyException, ObjectAlreadyExistsException { - item.getItem().requestAction(getSystemKey(), "workflow/predefined/"+predefStep, PredefinedStep.DONE, PredefinedStep.bundleData(params)); + return item.getItem().requestAction(getSystemKey(), "workflow/predefined/"+predefStep, PredefinedStep.DONE, PredefinedStep.bundleData(params)); } /** Wrappers for scripts */ diff --git a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java index f3a2f44..454da6d 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java +++ b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java @@ -158,7 +158,7 @@ public class ItemProxy /************************************************************************** * **************************************************************************/ - public void requestAction( Job thisJob ) + public String requestAction( Job thisJob ) throws AccessRightsException, InvalidTransitionException, ObjectNotFoundException, @@ -178,7 +178,7 @@ public class ItemProxy throw new InvalidDataException("No Agent specified.", ""); Logger.msg(7, "ItemProxy - executing "+thisJob.getStepPath()+" for "+thisJob.getAgentName()); - getItem().requestAction (thisJob.getAgentId(), thisJob.getStepPath(), + return getItem().requestAction (thisJob.getAgentId(), thisJob.getStepPath(), thisJob.getTransition().getId(), outcome); } diff --git a/src/main/java/com/c2kernel/lifecycle/instance/Activity.java b/src/main/java/com/c2kernel/lifecycle/instance/Activity.java index 8e578c2..b86e200 100644 --- a/src/main/java/com/c2kernel/lifecycle/instance/Activity.java +++ b/src/main/java/com/c2kernel/lifecycle/instance/Activity.java @@ -129,7 +129,7 @@ public class Activity extends WfVertex /** cf Item request * @throws ObjectNotFoundException * @throws PersistencyException */ - public void request(AgentPath agent, int itemSysKey, int transitionID, String requestData) throws AccessRightsException, InvalidTransitionException, InvalidDataException, ObjectNotFoundException, PersistencyException + public String request(AgentPath agent, int itemSysKey, int transitionID, String requestData) throws AccessRightsException, InvalidTransitionException, InvalidDataException, ObjectNotFoundException, PersistencyException { // Find requested transition @@ -203,6 +203,8 @@ public class Activity extends WfVertex //refresh all the job lists pushJobsToAgents(itemSysKey); + + return outcome; } protected String runActivityLogic(AgentPath agent, int itemSysKey, diff --git a/src/main/java/com/c2kernel/lifecycle/instance/CompositeActivity.java b/src/main/java/com/c2kernel/lifecycle/instance/CompositeActivity.java index 016298f..e6d1bf9 100644 --- a/src/main/java/com/c2kernel/lifecycle/instance/CompositeActivity.java +++ b/src/main/java/com/c2kernel/lifecycle/instance/CompositeActivity.java @@ -413,12 +413,12 @@ public class CompositeActivity extends Activity } @Override - public void request(AgentPath agent, int itemSysKey, int transitionID, String requestData) throws AccessRightsException, InvalidTransitionException, InvalidDataException, ObjectNotFoundException, PersistencyException + public String request(AgentPath agent, int itemSysKey, int transitionID, String requestData) throws AccessRightsException, InvalidTransitionException, InvalidDataException, ObjectNotFoundException, PersistencyException { if (getChildrenGraphModel().getStartVertex() != null && !getStateMachine().getState(state).isFinished() && transitionID == CompositeActivity.START) ((WfVertex) getChildrenGraphModel().getStartVertex()).run(agent, itemSysKey); - super.request(agent, itemSysKey, transitionID, requestData); + return super.request(agent, itemSysKey, transitionID, requestData); } public void refreshJobs(int itemSysKey) diff --git a/src/main/java/com/c2kernel/lifecycle/instance/Workflow.java b/src/main/java/com/c2kernel/lifecycle/instance/Workflow.java index fa5e66b..8ff2fe2 100644 --- a/src/main/java/com/c2kernel/lifecycle/instance/Workflow.java +++ b/src/main/java/com/c2kernel/lifecycle/instance/Workflow.java @@ -99,12 +99,12 @@ public class Workflow extends CompositeActivity implements C2KLocalObject * @throws PersistencyException */ //requestData is xmlstring - public void requestAction(AgentPath agent, String stepPath, int itemSysKey, int transitionID, String requestData) + public String requestAction(AgentPath agent, String stepPath, int itemSysKey, int transitionID, String requestData) throws ObjectNotFoundException, AccessRightsException, InvalidTransitionException, InvalidDataException, ObjectAlreadyExistsException, PersistencyException { Logger.msg(3, "Action: " + transitionID + " " + stepPath + " by " + agent.getAgentName()); if (search(stepPath) != null) - ((Activity) search(stepPath)).request(agent, itemSysKey, transitionID, requestData); + return ((Activity) search(stepPath)).request(agent, itemSysKey, transitionID, requestData); else throw new ObjectNotFoundException(stepPath + " not found", ""); } -- cgit v1.2.3