From b086f57f56bf0eb9dab9cf321a0f69aaaae84347 Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Wed, 30 May 2012 08:37:45 +0200 Subject: Initial Maven Conversion --- source/com/c2kernel/entity/proxy/AgentProxy.java | 302 ------------------ .../entity/proxy/DomainPathSubscriber.java | 18 -- source/com/c2kernel/entity/proxy/EntityProxy.java | 248 --------------- .../c2kernel/entity/proxy/EntityProxyManager.java | 339 --------------------- .../c2kernel/entity/proxy/EntityProxyObserver.java | 27 -- source/com/c2kernel/entity/proxy/ItemProxy.java | 213 ------------- .../c2kernel/entity/proxy/MemberSubscription.java | 118 ------- .../entity/proxy/ProxyClientConnection.java | 185 ----------- source/com/c2kernel/entity/proxy/ProxyMessage.java | 102 ------- .../entity/proxy/ProxyServerConnection.java | 134 -------- 10 files changed, 1686 deletions(-) delete mode 100644 source/com/c2kernel/entity/proxy/AgentProxy.java delete mode 100644 source/com/c2kernel/entity/proxy/DomainPathSubscriber.java delete mode 100644 source/com/c2kernel/entity/proxy/EntityProxy.java delete mode 100644 source/com/c2kernel/entity/proxy/EntityProxyManager.java delete mode 100644 source/com/c2kernel/entity/proxy/EntityProxyObserver.java delete mode 100644 source/com/c2kernel/entity/proxy/ItemProxy.java delete mode 100644 source/com/c2kernel/entity/proxy/MemberSubscription.java delete mode 100644 source/com/c2kernel/entity/proxy/ProxyClientConnection.java delete mode 100644 source/com/c2kernel/entity/proxy/ProxyMessage.java delete mode 100644 source/com/c2kernel/entity/proxy/ProxyServerConnection.java (limited to 'source/com/c2kernel/entity/proxy') diff --git a/source/com/c2kernel/entity/proxy/AgentProxy.java b/source/com/c2kernel/entity/proxy/AgentProxy.java deleted file mode 100644 index 72ed088..0000000 --- a/source/com/c2kernel/entity/proxy/AgentProxy.java +++ /dev/null @@ -1,302 +0,0 @@ -/************************************************************************** - * AgentProxy.java - * - * $Revision: 1.37 $ - * $Date: 2005/10/05 07:39:36 $ - * - * Copyright (C) 2001 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -package com.c2kernel.entity.proxy; - -import java.util.Date; -import java.util.Enumeration; - -import com.c2kernel.common.AccessRightsException; -import com.c2kernel.common.InvalidDataException; -import com.c2kernel.common.InvalidTransitionException; -import com.c2kernel.common.ObjectAlreadyExistsException; -import com.c2kernel.common.ObjectNotFoundException; -import com.c2kernel.common.PersistencyException; -import com.c2kernel.entity.Agent; -import com.c2kernel.entity.AgentHelper; -import com.c2kernel.entity.C2KLocalObject; -import com.c2kernel.entity.ManageableEntity; -import com.c2kernel.entity.agent.Job; -import com.c2kernel.lifecycle.instance.predefined.PredefinedStep; -import com.c2kernel.lifecycle.instance.stateMachine.Transitions; -import com.c2kernel.lookup.AgentPath; -import com.c2kernel.lookup.DomainPath; -import com.c2kernel.lookup.EntityPath; -import com.c2kernel.lookup.InvalidEntityPathException; -import com.c2kernel.lookup.Path; -import com.c2kernel.persistency.outcome.OutcomeValidator; -import com.c2kernel.persistency.outcome.Schema; -import com.c2kernel.process.Gateway; -import com.c2kernel.scripting.ErrorInfo; -import com.c2kernel.scripting.Script; -import com.c2kernel.scripting.ScriptingEngineException; -import com.c2kernel.utils.CastorXMLUtility; -import com.c2kernel.utils.LocalObjectLoader; -import com.c2kernel.utils.Logger; - -/****************************************************************************** - * It is a wrapper for the connection and communication with Agent - * It caches data loaded from the Agent to reduce communication - * - * @version $Revision: 1.37 $ $Date: 2005/10/05 07:39:36 $ - * @author $Author: abranson $ - ******************************************************************************/ -public class AgentProxy extends EntityProxy -{ - AgentPath path; - - /************************************************************************** - * Creates an AgentProxy without cache and change notification - **************************************************************************/ - public AgentProxy( org.omg.CORBA.Object ior, - int systemKey) - throws ObjectNotFoundException - { - super(ior, systemKey); - try { - path = new AgentPath(systemKey); - } catch (InvalidEntityPathException e) { - throw new ObjectNotFoundException(); - } - } - - @Override - public ManageableEntity narrow() throws ObjectNotFoundException - { - try { - return AgentHelper.narrow(mIOR); - } catch (org.omg.CORBA.BAD_PARAM ex) { } - throw new ObjectNotFoundException("CORBA Object was not an Agent, or the server is down."); - } - /************************************************************************** - * - * - **************************************************************************/ - public void initialise( String agentProps, String collector ) - throws AccessRightsException, - InvalidDataException, - PersistencyException, - ObjectNotFoundException - { - Logger.msg(7, "AgentProxy::initialise - started"); - - ((Agent)getEntity()).initialise( agentProps ); - } - - public AgentPath getPath() { - return path; - } - - /** - * Executes a job on the given item using this agent. - * - * @param item - item holding this job - * @param job - the job to execute - */ - public void execute(ItemProxy item, Job job) - throws AccessRightsException, - InvalidTransitionException, - ObjectNotFoundException, - InvalidDataException, - PersistencyException, - ObjectAlreadyExistsException - { - OutcomeValidator validator = null; - String scriptName = job.getActPropString("ScriptName"); - Date startTime = new Date(); - Logger.msg(3, "AgentProxy - executing "+job.getStepPath()+" for "+path.getAgentName()); - // get the outcome validator if present - if (job.isOutcomeUsed()) - { - - // get schema info from act props - String schemaName = job.getActPropString("SchemaType"); - int schemaVersion; - try { - schemaVersion = Integer.parseInt(job.getActPropString("SchemaVersion")); - } catch (Exception e) { - throw new InvalidDataException(e.getClass().getName()+" extracing schema version", ""); - } - Logger.msg(5, "AgentProxy - fetching schema "+schemaName+"_"+schemaVersion+" for validation"); - // retrieve schema - Schema schema = LocalObjectLoader.getSchema(schemaName, schemaVersion); - - if (schema == null) - throw new InvalidDataException("Job references outcome type "+schemaName+" version "+schemaVersion+" that does not exist in this centre.", ""); - - try { - validator = OutcomeValidator.getValidator(schema); - } catch (Exception e) { - throw new InvalidDataException("Could not create validator: "+e.getMessage(), ""); - } - } - - if(scriptName != null && scriptName.length() > 0 && - (job.getPossibleTransition() == Transitions.DONE || job.getPossibleTransition() == Transitions.COMPLETE)) { - Logger.msg(3, "AgentProxy - executing script "+scriptName); - try { - - // pre-validate outcome from script if there is one - if (job.getOutcomeString()!= null && validator != null) { - Logger.msg(5, "AgentProxy - validating outcome before script execution"); - String error = validator.validate(job.getOutcomeString()); - if (error.length() > 0) { - Logger.error("Outcome not valid: \n " + error); - throw new InvalidDataException(error, ""); - } - } - - // load script - ErrorInfo scriptErrors = (ErrorInfo)callScript(item, job); - - if (scriptErrors.getFatal()) { - Logger.msg(3, "AgentProxy - fatal script error"); - Logger.error(scriptErrors.getErrors()); - throw new InvalidDataException("Fatal Script Error: \n"+scriptErrors.getErrors(), ""); - } - if (scriptErrors.getErrors().length() > 0) - Logger.warning("Script errors: "+scriptErrors.getErrors()); - } catch (ScriptingEngineException ex) { - Logger.error(ex); - throw new InvalidDataException(ex.getMessage(), ""); - } - } - - if (job.isOutcomeUsed()) { - Logger.msg(3, "AgentProxy - validating outcome"); - String error = validator.validate(job.getOutcomeString()); - if (error.length() > 0) - throw new InvalidDataException(error, ""); - } - - job.setAgentId(getSystemKey()); - Logger.msg(3, "AgentProxy - submitting job to item proxy"); - item.requestAction(job); - if (Logger.doLog(3)) { - Date timeNow = new Date(); - long secsNow = (timeNow.getTime()-startTime.getTime())/1000; - Logger.msg(3, "Execution took "+secsNow+" seconds"); - } - } - - public Object callScript(ItemProxy item, Job job) throws ScriptingEngineException { - Script script = new Script(item, this, job); - return script.execute(); - } - - /** - * Standard execution of jobs. Note that this method should always be the one used from clients - all execution - * parameters are taken from the job where they're probably going to be correct. - * - * @param job - * @throws AccessRightsException - * @throws InvalidDataException - * @throws InvalidTransitionException - * @throws ObjectNotFoundException - * @throws PersistencyException - * @throws ObjectAlreadyExistsException - */ - public void execute(Job job) - throws AccessRightsException, - InvalidDataException, - InvalidTransitionException, - ObjectNotFoundException, - PersistencyException, - ObjectAlreadyExistsException - { - try { - ItemProxy targetItem = (ItemProxy)Gateway.getProxyManager().getProxy(new EntityPath(job.getItemSysKey())); - execute(targetItem, job); - } catch (InvalidEntityPathException e) { - throw new ObjectNotFoundException("Job contained invalid item sysKey: "+job.getItemSysKey(), ""); - } - } - - public void execute(ItemProxy item, String predefStep, C2KLocalObject obj) - throws AccessRightsException, - InvalidDataException, - InvalidTransitionException, - ObjectNotFoundException, - PersistencyException, - ObjectAlreadyExistsException - { - String param; - try { - param = marshall(obj); - } catch (Exception ex) { - Logger.error(ex); - throw new InvalidDataException("Error on marshall", ""); - } - execute(item, predefStep, param); - } - - public void execute(ItemProxy item, String predefStep, String param) - throws AccessRightsException, - InvalidDataException, - InvalidTransitionException, - ObjectNotFoundException, - PersistencyException, - ObjectAlreadyExistsException - { - String[] params = new String[1]; - params[0] = param; - execute(item, predefStep, params); - } - - public void execute(ItemProxy item, String predefStep, String[] params) - throws AccessRightsException, - InvalidDataException, - InvalidTransitionException, - ObjectNotFoundException, - PersistencyException, - ObjectAlreadyExistsException - { - item.requestAction(getSystemKey(), "workflow/predefined/"+predefStep, Transitions.DONE, PredefinedStep.bundleData(params)); - } - - /** Wrappers for scripts */ - public String marshall(Object obj) throws Exception { - return CastorXMLUtility.marshall(obj); - } - - public Object unmarshall(String obj) throws Exception { - return CastorXMLUtility.unmarshall(obj); - } - - /** Let scripts resolve items */ - public ItemProxy searchItem(String name) throws ObjectNotFoundException { - Enumeration results = Gateway.getLDAPLookup().search(new DomainPath(""),name); - - Path returnPath = null; - if (!results.hasMoreElements()) - throw new ObjectNotFoundException(name, ""); - - while(results.hasMoreElements()) { - Path nextMatch = results.nextElement(); - if (returnPath != null && nextMatch.getSysKey() != -1 && returnPath.getSysKey() != nextMatch.getSysKey()) - throw new ObjectNotFoundException("Too many items with that name"); - returnPath = nextMatch; - } - - return (ItemProxy)Gateway.getProxyManager().getProxy(returnPath); - } - - public ItemProxy getItem(String path) throws ObjectNotFoundException { - return (getItem(new DomainPath(path))); - } - - public ItemProxy getItem(DomainPath path) throws ObjectNotFoundException { - return (ItemProxy)Gateway.getProxyManager().getProxy(path); - } - - public ItemProxy getItemBySysKey(int sysKey) throws ObjectNotFoundException, InvalidEntityPathException { - return (ItemProxy)Gateway.getProxyManager().getProxy(new EntityPath(sysKey)); - } -} diff --git a/source/com/c2kernel/entity/proxy/DomainPathSubscriber.java b/source/com/c2kernel/entity/proxy/DomainPathSubscriber.java deleted file mode 100644 index 4089325..0000000 --- a/source/com/c2kernel/entity/proxy/DomainPathSubscriber.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.c2kernel.entity.proxy; - -import com.c2kernel.lookup.DomainPath; - -/************************************************************************** - * - * $Revision: 1.1 $ - * $Date: 2004/02/05 16:11:57 $ - * - * Copyright (C) 2003 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -public interface DomainPathSubscriber { - - public void pathAdded(DomainPath path); - public void pathRemoved(DomainPath path); -} diff --git a/source/com/c2kernel/entity/proxy/EntityProxy.java b/source/com/c2kernel/entity/proxy/EntityProxy.java deleted file mode 100644 index fae2e28..0000000 --- a/source/com/c2kernel/entity/proxy/EntityProxy.java +++ /dev/null @@ -1,248 +0,0 @@ -/************************************************************************** - * EntityProxy.java - * - * $Revision: 1.35 $ - * $Date: 2005/05/10 11:40:09 $ - * - * Copyright (C) 2001 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -package com.c2kernel.entity.proxy; - -import java.util.HashMap; -import java.util.Iterator; - -import com.c2kernel.common.ObjectNotFoundException; -import com.c2kernel.entity.C2KLocalObject; -import com.c2kernel.entity.ManageableEntity; -import com.c2kernel.persistency.ClusterStorageException; -import com.c2kernel.process.Gateway; -import com.c2kernel.property.Property; -import com.c2kernel.utils.CastorXMLUtility; -import com.c2kernel.utils.Logger; - - -/****************************************************************************** -* It is a wrapper for the connection and communication with Entities. -* It can cache data loaded from the Entity to reduce communication with it. -* This cache is syncronised with corresponding Entity through an event mechanism. -* -* @version $Revision: 1.35 $ $Date: 2005/05/10 11:40:09 $ -* @author $Author: abranson $ -******************************************************************************/ - -abstract public class EntityProxy implements ManageableEntity -{ - - protected ManageableEntity mEntity = null; - protected org.omg.CORBA.Object mIOR; - protected int mSystemKey; - private HashMap, EntityProxyObserver> mSubscriptions; - - /************************************************************************** - * - **************************************************************************/ - protected EntityProxy( org.omg.CORBA.Object ior, - int systemKey) - throws ObjectNotFoundException - { - Logger.msg(8,"EntityProxy::EntityProxy() - Initialising '" +systemKey+ "' entity"); - - initialise( ior, systemKey); - } - - /************************************************************************** - * - **************************************************************************/ - private void initialise( org.omg.CORBA.Object ior, - int systemKey) - throws ObjectNotFoundException - { - Logger.msg(8, "EntityProxy::initialise() - Initialising '" +systemKey+ "' entity"); - - mIOR = ior; - mSystemKey = systemKey; - mSubscriptions = new HashMap, EntityProxyObserver>(); - } - - - /************************************************************************** - * - **************************************************************************/ - public ManageableEntity getEntity() throws ObjectNotFoundException - { - if (mEntity == null) { - mEntity = narrow(); - } - return mEntity; - } - - abstract public ManageableEntity narrow() throws ObjectNotFoundException; - - /************************************************************************** - * - **************************************************************************/ - //check who is using.. and if toString() is sufficient - @Override - public int getSystemKey() - { - return mSystemKey; - } - - - /************************************************************************** - * - **************************************************************************/ - @Override - public String queryData( String path ) - throws ObjectNotFoundException - { - - try { - Logger.msg(7, "EntityProxy.queryData() - "+mSystemKey+"/"+path); - if (path.endsWith("all")) { - Logger.msg(7, "EntityProxy.queryData() - listing contents"); - String[] result = Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length()-3)); - StringBuffer retString = new StringBuffer(); - for (int i = 0; i < result.length; i++) { - retString.append(result[i]); - if (i"+e.getMessage()+""; - } - } - - public String[] getContents( String path ) throws ObjectNotFoundException { - try { - return Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length())); - } catch (ClusterStorageException e) { - throw new ObjectNotFoundException(e.toString()); - } - } - - - /************************************************************************** - * - **************************************************************************/ - public C2KLocalObject getObject( String xpath ) - throws ObjectNotFoundException - { - // load from storage, falling back to proxy loader if not found in others - try - { - return Gateway.getStorage().get( mSystemKey, xpath , null); - } - catch( ClusterStorageException ex ) - { - Logger.msg(4, "Exception loading object :"+mSystemKey+"/"+xpath); - throw new ObjectNotFoundException( ex.toString() ); - } - } - - - - public String getProperty( String name ) - throws ObjectNotFoundException - { - Logger.msg(5, "Get property "+name+" from syskey/"+mSystemKey); - Property prop = (Property)getObject("Property/"+name); - try - { - return prop.getValue(); - } - catch (NullPointerException ex) - { - throw new ObjectNotFoundException(); - } - } - - public String getName() - { - try { - return getProperty("Name"); - } catch (ObjectNotFoundException ex) { - return null; - } - } - - - /************************************************************************** - * Subscription methods - **************************************************************************/ - - public void subscribe (MemberSubscription newSub) { - - newSub.setSubject(this); - synchronized (this){ - mSubscriptions.put( newSub, newSub.getObserver() ); - } - new Thread(newSub).start(); - Logger.msg(7, "Subscribed "+newSub.getObserver().getClass().getName()+" for "+newSub.interest); - } - - public void unsubscribe(EntityProxyObserver observer) - { - synchronized (this){ - for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { - MemberSubscription thisSub = e.next(); - if (mSubscriptions.get( thisSub ) == observer) { - e.remove(); - Logger.msg(7, "Unsubscribed "+observer.getClass().getName()); - } - } - } - } - - public void dumpSubscriptions(int logLevel) { - if (mSubscriptions.size() == 0) return; - Logger.msg(logLevel, "Subscriptions to proxy "+mSystemKey+":"); - synchronized(this) { - for (MemberSubscription element : mSubscriptions.keySet()) { - EntityProxyObserver obs = element.getObserver(); - if (obs != null) - Logger.msg(logLevel, " "+element.getObserver().getClass().getName()+" subscribed to "+element.interest); - else - Logger.msg(logLevel, " Phantom subscription to "+element.interest); - } - } - } - - public void notify(ProxyMessage message) { - Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mSystemKey); - synchronized (this){ - if (!message.getServer().equals(EntityProxyManager.serverName)) - Gateway.getStorage().clearCache(mSystemKey, message.getPath()); - for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { - MemberSubscription newSub = e.next(); - if (newSub.getObserver() == null) { // phantom - Logger.msg(4, "Removing phantom subscription to "+newSub.interest); - e.remove(); - } - else - newSub.update(message.getPath(), message.getState()); - } - } - } - - /** - * If this is reaped, clear out the cache for it too. - */ - @Override - protected void finalize() throws Throwable { - Logger.msg(7, "Proxy "+mSystemKey+" reaped"); - Gateway.getStorage().clearCache(mSystemKey, null); - Gateway.getProxyManager().removeProxy(mSystemKey); - super.finalize(); - } - -} diff --git a/source/com/c2kernel/entity/proxy/EntityProxyManager.java b/source/com/c2kernel/entity/proxy/EntityProxyManager.java deleted file mode 100644 index 192a984..0000000 --- a/source/com/c2kernel/entity/proxy/EntityProxyManager.java +++ /dev/null @@ -1,339 +0,0 @@ -/************************************************************************** - * EntityProxyFactory.java - * - * $Revision: 1.45 $ - * $Date: 2005/05/10 11:40:09 $ - * - * Copyright (C) 2001 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -package com.c2kernel.entity.proxy; - -import java.util.ArrayList; -import java.util.ConcurrentModificationException; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Iterator; - -import com.c2kernel.common.InvalidDataException; -import com.c2kernel.common.ObjectNotFoundException; -import com.c2kernel.lookup.AgentPath; -import com.c2kernel.lookup.DomainPath; -import com.c2kernel.lookup.Path; -import com.c2kernel.persistency.ClusterStorage; -import com.c2kernel.process.Gateway; -import com.c2kernel.property.Property; -import com.c2kernel.utils.Logger; -import com.c2kernel.utils.SoftCache; -import com.c2kernel.utils.server.SimpleTCPIPServer; - - -public class EntityProxyManager -{ - SoftCache proxyPool = new SoftCache(50); - HashMap treeSubscribers = new HashMap(); - HashMap connections = new HashMap(); - - // server objects - static ArrayList proxyClients = new ArrayList(); - static SimpleTCPIPServer proxyServer = null; - static String serverName = null; - - /** - * Create an entity proxy manager to listen for proxy events and reap unused proxies - */ - public EntityProxyManager() - { - Logger.msg(5, "EntityProxyManager - Starting....."); - - Enumeration servers = Gateway.getLDAPLookup().searchEntities(new DomainPath("/servers")); - while(servers.hasMoreElements()) { - Path thisServerPath = servers.nextElement(); - try { - int syskey = thisServerPath.getSysKey(); - String remoteServer = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/Name", null)).getValue(); - String portStr = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/ProxyPort", null)).getValue(); - int remotePort = Integer.parseInt(portStr); - connectToProxyServer(remoteServer, remotePort); - - } catch (Exception ex) { - Logger.error("Exception retrieving proxy server connection data for "+thisServerPath); - Logger.error(ex); - } - } - } - - public void connectToProxyServer(String name, int port) { - ProxyServerConnection oldConn = connections.get(name); - if (oldConn != null) - oldConn.shutdown(); - connections.put(name, new ProxyServerConnection(name, port, this)); - } - - - protected void resubscribe(ProxyServerConnection conn) { - synchronized (proxyPool) { - for (Integer key : proxyPool.keySet()) { - ProxyMessage sub = new ProxyMessage(key.intValue(), ProxyMessage.ADDPATH, false); - Logger.msg(5, "Subscribing to entity "+key); - conn.sendMessage(sub); - } - } - } - - /** - * @param sub - */ - private void sendMessage(ProxyMessage sub) { - for (ProxyServerConnection element : connections.values()) { - element.sendMessage(sub); - } - - } - - public void shutdown() { - Logger.msg("EntityProxyManager.shutdown() - flagging shutdown of server connections"); - for (ProxyServerConnection element : connections.values()) { - element.shutdown(); - } - } - - protected void processMessage(ProxyMessage thisMessage) throws InvalidDataException { - if (Logger.doLog(9)) Logger.msg(9, thisMessage.toString()); - - if (thisMessage.getPath().equals(ProxyMessage.PINGPATH)) // ping response - return; - - if (thisMessage.getSysKey() == ProxyMessage.NA) // must be domain path info - informTreeSubscribers(thisMessage.getState(), thisMessage.getPath()); - else { - // proper proxy message - Logger.msg(5, "Received proxy message: "+thisMessage.toString()); - Integer key = new Integer(thisMessage.getSysKey()); - EntityProxy relevant = proxyPool.get(key); - if (relevant == null) - Logger.warning("Received proxy message for sysKey "+thisMessage.getSysKey()+" which we don't have a proxy for."); - else - try { - relevant.notify(thisMessage); - } catch (Throwable ex) { - Logger.error("Error caught notifying proxy listener "+relevant.toString()+" of "+thisMessage.toString()); - Logger.error(ex); - } - } - } - - private void informTreeSubscribers(boolean state, String path) { - DomainPath last = new DomainPath(path); - DomainPath parent; boolean first = true; - synchronized(treeSubscribers) { - while((parent = last.getParent()) != null) { - - for (DomainPathSubscriber sub : treeSubscribers.keySet()) { - DomainPath interest = treeSubscribers.get(sub); - if (interest.equals(parent)) { - if (state == ProxyMessage.ADDED) - sub.pathAdded(last); - else if (first) - sub.pathRemoved(last); - } - } - last = parent; - first = false; - } - } - } - - public void subscribeTree(DomainPathSubscriber sub, DomainPath interest) { - synchronized(treeSubscribers) { - treeSubscribers.put(sub, interest); - } - } - - public void unsubscribeTree(DomainPathSubscriber sub) { - synchronized(treeSubscribers) { - treeSubscribers.remove(sub); - } - } - - /************************************************************************** - * - **************************************************************************/ - private EntityProxy createProxy( org.omg.CORBA.Object ior, - int systemKey, - boolean isItem ) - throws ObjectNotFoundException - { - - EntityProxy newProxy = null; - - Logger.msg(5, "EntityProxyFactory::creating proxy on entity " + systemKey); - - if( isItem ) - { - newProxy = new ItemProxy(ior, systemKey); - } - else - { - newProxy = new AgentProxy(ior, systemKey); - } - - // subscribe to changes from server - ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.ADDPATH, false); - sendMessage(sub); - reportCurrentProxies(9); - return ( newProxy ); - } - - protected void removeProxy( int systemKey ) - { - ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.DELPATH, true); - Logger.msg(5,"EntityProxyManager.removeProxy() - Unsubscribing to proxy informer for "+systemKey); - sendMessage(sub); - } - - - /************************************************************************** - * EntityProxy getProxy( ManageableEntity, SystemKey) - * - * Called by the other GetProxy methods. Fills in either the ior or the - * SystemKey - **************************************************************************/ - private EntityProxy getProxy( org.omg.CORBA.Object ior, - int systemKey, - boolean isItem ) - throws ObjectNotFoundException - { - Integer key = new Integer(systemKey); - - synchronized(proxyPool) { - EntityProxy newProxy; - // return it if it exists - newProxy = proxyPool.get(key); - if (newProxy == null) { - // create a new one - newProxy = createProxy(ior, systemKey, isItem ); - proxyPool.put(key, newProxy); - } - return newProxy; - - } - } - - /************************************************************************** - * EntityProxy getProxy( String ) - * - * Proxy from Alias - **************************************************************************/ - public EntityProxy getProxy( Path path ) - throws ObjectNotFoundException - { - - //convert namePath to dn format - Logger.msg(8,"EntityProxyFactory::getProxy(" + path.toString() + ")"); - boolean isItem = !(path.getEntity() instanceof AgentPath); - return getProxy( Gateway.getLDAPLookup().getIOR(path), - path.getSysKey(), - isItem ); - - } - - /************************************************************************** - * void reportCurrentProxies() - * - * A utility to Dump the current proxies loaded - **************************************************************************/ - public void reportCurrentProxies(int logLevel) - { - if (!Logger.doLog(logLevel)) return; - Logger.msg(logLevel, "Current proxies: "); - try { - synchronized(proxyPool) { - Iterator i = proxyPool.keySet().iterator(); - - for( int count=0; i.hasNext(); count++ ) - { - Integer nextProxy = i.next(); - EntityProxy thisProxy = proxyPool.get(nextProxy); - if (thisProxy != null) - Logger.msg(logLevel, - "" + count + ": " - + proxyPool.get(nextProxy).getClass().getName() - + ": " + nextProxy); - } - } - } catch (ConcurrentModificationException ex) { - Logger.msg(logLevel, "Proxy cache modified. Aborting."); - } - } - - - /************************************************************************** - * Static Proxy Server methods - **************************************************************************/ - - /** - * Initialises the Proxy event UDP server listening on 'Host.Proxy.port' from c2kprops - * @param c2kProps - */ - public static void initServer() - { - Logger.msg(5, "EntityProxyFactory::initServer - Starting....."); - String port = Gateway.getProperty("ItemServer.Proxy.port"); - serverName = Gateway.getProperty("ItemServer.name"); - if (port == null) { - Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of entity changes."); - return; - } - - // set up the proxy server - try { - int portNo = Integer.parseInt(port); - Logger.msg(5, "EntityProxyFactory::initServer - Initialising proxy informer on port "+port); - proxyServer = new SimpleTCPIPServer(portNo, ProxyClientConnection.class, 200); - proxyServer.startListening(); - } catch (Exception ex) { - Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of entity changes."); - Logger.error(ex); - } - } - - public static void sendProxyEvent(ProxyMessage message) { - if (proxyServer != null && message.getPath() != null) - synchronized(proxyClients) { - for (ProxyClientConnection client : proxyClients) { - client.sendMessage(message); - } - } - } - - public static void reportConnections(int logLevel) { - synchronized(proxyClients) { - Logger.msg(logLevel, "Currently connected proxy clients:"); - for (ProxyClientConnection client : proxyClients) { - Logger.msg(logLevel, " "+client); - } - } - } - - public static void shutdownServer() { - if (proxyServer != null) { - Logger.msg(1, "EntityProxyManager: Closing Server."); - proxyServer.stopListening(); - } - } - - public static void registerProxyClient(ProxyClientConnection client) { - synchronized(proxyClients) { - proxyClients.add(client); - } - } - - public static void unRegisterProxyClient(ProxyClientConnection client) { - synchronized(proxyClients) { - proxyClients.remove(client); - } - } -} - diff --git a/source/com/c2kernel/entity/proxy/EntityProxyObserver.java b/source/com/c2kernel/entity/proxy/EntityProxyObserver.java deleted file mode 100644 index 3ddb99c..0000000 --- a/source/com/c2kernel/entity/proxy/EntityProxyObserver.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.c2kernel.entity.proxy; - -import com.c2kernel.entity.C2KLocalObject; - - - -public interface EntityProxyObserver -{ - /************************************************************************** - * Subscribed items are broken apart and fed one by one to these methods. - * Replacement after an event is done by feeding the new memberbase with the same id. - * ID could be an XPath? - **************************************************************************/ - public void add(V contents); - - /************************************************************************** - * the 'type' parameter should be an indication of the type of object - * supplied so that the subscriber can associate the call back with - * one of its subscriptions. If we go with an Xpath subscription form, - * then the id will probably be sufficient. - * Should be comparable (substring whatever) with the parameter given to - * the subscribe method of ItemProxy. - **************************************************************************/ - public void remove(String id); - - public void control(String control, String msg); -} diff --git a/source/com/c2kernel/entity/proxy/ItemProxy.java b/source/com/c2kernel/entity/proxy/ItemProxy.java deleted file mode 100644 index 658e0c8..0000000 --- a/source/com/c2kernel/entity/proxy/ItemProxy.java +++ /dev/null @@ -1,213 +0,0 @@ -/************************************************************************** - * ItemProxy.java - * - * $Revision: 1.25 $ - * $Date: 2005/05/10 11:40:09 $ - * - * Copyright (C) 2001 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -package com.c2kernel.entity.proxy; - -import java.util.ArrayList; - -import com.c2kernel.common.AccessRightsException; -import com.c2kernel.common.InvalidDataException; -import com.c2kernel.common.InvalidTransitionException; -import com.c2kernel.common.ObjectAlreadyExistsException; -import com.c2kernel.common.ObjectNotFoundException; -import com.c2kernel.common.PersistencyException; -import com.c2kernel.entity.Item; -import com.c2kernel.entity.ItemHelper; -import com.c2kernel.entity.ManageableEntity; -import com.c2kernel.entity.agent.Job; -import com.c2kernel.entity.agent.JobArrayList; -import com.c2kernel.lifecycle.instance.stateMachine.Transitions; -import com.c2kernel.utils.CastorXMLUtility; -import com.c2kernel.utils.Logger; - -/****************************************************************************** - * It is a wrapper for the connection and communication with Item - * It caches data loaded from the Item to reduce communication - * - * @version $Revision: 1.25 $ $Date: 2005/05/10 11:40:09 $ - * @author $Author: abranson $ - ******************************************************************************/ -public class ItemProxy extends EntityProxy -{ - - /************************************************************************** - * - **************************************************************************/ - protected ItemProxy( org.omg.CORBA.Object ior, - int systemKey) - throws ObjectNotFoundException - { - super(ior, systemKey); - - } - - @Override - public ManageableEntity narrow() throws ObjectNotFoundException - { - try { - return ItemHelper.narrow(mIOR); - } catch (org.omg.CORBA.BAD_PARAM ex) { } - throw new ObjectNotFoundException("CORBA Object was not an Item, or the server is down."); - } - /************************************************************************** - * - * - **************************************************************************/ - public void initialise( int agentId, - String itemProps, - String workflow ) - throws AccessRightsException, - InvalidDataException, - PersistencyException, - ObjectNotFoundException - { - Logger.msg(7, "ItemProxy::initialise - started"); - - ((Item)getEntity()).initialise( agentId, itemProps, workflow ); - } - - public void setProperty(AgentProxy agent, String name, String value) - throws AccessRightsException, - PersistencyException - { - String[] params = new String[2]; - params[0] = name; - params[1] = value; - try { - agent.execute(this, "WriteProperty", params); - } catch (AccessRightsException e) { - throw (e); - } catch (PersistencyException e) { - throw (e); - } catch (Exception e) { - Logger.error(e); - throw new PersistencyException("Could not store property"); - } - } - /************************************************************************** - * - **************************************************************************/ - protected void requestAction( Job thisJob ) - throws AccessRightsException, - InvalidTransitionException, - ObjectNotFoundException, - InvalidDataException, - PersistencyException, - ObjectAlreadyExistsException - { - String outcome = thisJob.getOutcomeString(); - // check fields that should have been filled in - if (outcome==null) - if (thisJob.isOutcomeUsed()) - throw new InvalidDataException("Outcome is required.", ""); - else - outcome=""; - - if (thisJob.getAgentId() == -1) - throw new InvalidDataException("No Agent specified.", ""); - - Logger.msg(7, "ItemProxy - executing "+thisJob.getStepPath()+" for "+thisJob.getAgentName()); - requestAction (thisJob.getAgentId(), thisJob.getStepPath(), - thisJob.getPossibleTransition(), outcome); - } - - //requestData is xmlString - public void requestAction( int agentId, - String stepPath, - int transitionID, - String requestData - ) - throws AccessRightsException, - InvalidTransitionException, - ObjectNotFoundException, - InvalidDataException, - PersistencyException, - ObjectAlreadyExistsException - { - ((Item)getEntity()).requestAction( agentId, - stepPath, - transitionID, - requestData ); - } - - /************************************************************************** - * - **************************************************************************/ - public String queryLifeCycle( int agentId, - boolean filter - ) - throws AccessRightsException, - ObjectNotFoundException, - PersistencyException - { - return ((Item)getEntity()).queryLifeCycle( agentId, - filter ); - } - - - /************************************************************************** - * - **************************************************************************/ - private ArrayList getJobList(int agentId, boolean filter) - throws AccessRightsException, - ObjectNotFoundException, - PersistencyException - { - JobArrayList thisJobList; - try { - String jobs = queryLifeCycle(agentId, filter); - thisJobList = (JobArrayList)CastorXMLUtility.unmarshall(jobs); - } - catch (Exception e) { - Logger.error(e); - throw new PersistencyException("Exception::ItemProxy::getJobList() - Cannot unmarshall the jobs", null); - } - return thisJobList.list; - } - - public ArrayList getJobList(AgentProxy agent) - throws AccessRightsException, - ObjectNotFoundException, - PersistencyException - { - return getJobList(agent.getSystemKey()); - } - - private ArrayList getJobList(int agentId) - throws AccessRightsException, - ObjectNotFoundException, - PersistencyException - { - return getJobList(agentId, true); - } - - private Job getJobByName(String actName, int agentId) - throws AccessRightsException, - ObjectNotFoundException, - PersistencyException { - - ArrayList jobList = getJobList(agentId); - for (Job job : jobList) { - int transition = job.getPossibleTransition(); - if (job.getStepName().equals(actName)) - if (transition == Transitions.COMPLETE || transition == Transitions.DONE) - return job; - } - return null; - - } - - public Job getJobByName(String actName, AgentProxy agent) - throws AccessRightsException, - ObjectNotFoundException, - PersistencyException { - return getJobByName(actName, agent.getSystemKey()); - } -} diff --git a/source/com/c2kernel/entity/proxy/MemberSubscription.java b/source/com/c2kernel/entity/proxy/MemberSubscription.java deleted file mode 100644 index 157297f..0000000 --- a/source/com/c2kernel/entity/proxy/MemberSubscription.java +++ /dev/null @@ -1,118 +0,0 @@ - -package com.c2kernel.entity.proxy; -import java.lang.ref.WeakReference; -import java.util.ArrayList; -import java.util.StringTokenizer; - -import com.c2kernel.common.ObjectNotFoundException; -import com.c2kernel.entity.C2KLocalObject; -import com.c2kernel.utils.Logger; - -public class MemberSubscription implements Runnable { - public static final String ERROR = "Error"; - public static final String END = "theEND"; - - EntityProxy subject; - String interest; - // keep the subscriber by weak reference, so it is not kept from the garbage collector if no longer used - WeakReference> observerReference; - ArrayList contents = new ArrayList(); - boolean preLoad; - - public MemberSubscription(EntityProxyObserver observer, String interest, boolean preLoad) { - setObserver(observer); - this.interest = interest; - this.preLoad = preLoad; - } - - @Override - public void run() { - Thread.currentThread().setName("Member Subscription: "+subject.getSystemKey()+":"+interest); - if (preLoad) loadChildren(); - } - - private void loadChildren() { - C newMember; - EntityProxyObserver observer = getObserver(); - if (observer == null) return; //reaped - try { - // fetch contents of path - String children = subject.queryData(interest+"/all"); - StringTokenizer tok = new StringTokenizer(children, ","); - ArrayList newContents = new ArrayList(); - while (tok.hasMoreTokens()) - newContents.add(tok.nextToken()); - - // look to see what's new - for (String newChild: newContents) { - - // load child object - try { - newMember = (C)subject.getObject(interest+"/"+newChild); - contents.remove(newChild); - observer.add(newMember); - } catch (ObjectNotFoundException ex) { - observer.control(ERROR, "Listed member "+newChild+" was not found."); - } - } - // report what's left in old contents as deleted - for (String oldChild: contents) { - observer.remove(interest+"/"+oldChild); - } - //replace contents arraylist - contents = newContents; - //report that we're done - observer.control(END, null); - } catch (Exception ex) { - observer.control(ERROR, "Query on "+interest+" failed with "+ex); - } - } - - public boolean isRelevant(String path) { - Logger.msg(7, "Checking relevance of "+path+" to "+interest); - return (path.startsWith(interest)); - } - - public void update(String path, boolean deleted) { - EntityProxyObserver observer = getObserver(); - if (observer == null) return; //reaped - Logger.msg(7, "Processing proxy message path "+path +" for "+observer+". Interest: "+interest+" Was Deleted:"+deleted); - if (!path.startsWith(interest)) // doesn't concern us - return; - - if (path.equals(interest)) // refresh contents - loadChildren(); - else { - String name = path.substring(interest.length()); - if (deleted) { - Logger.msg(4, "Removing "+path); - contents.remove(name); - observer.remove(name); - } - else { - try { - C newMember = (C)subject.getObject(path); - Logger.msg(4, "Adding "+path); - contents.add(name); - observer.add(newMember); - } catch (ObjectNotFoundException e) { - Logger.error("Member Subscription: could not load "+path); - Logger.error(e); - } - } - } - } - - public void setObserver(EntityProxyObserver observer) { - observerReference = new WeakReference>(observer); - } - - public void setSubject(EntityProxy subject) { - this.subject = subject; - } - - public EntityProxyObserver getObserver() { - return observerReference.get(); - } -} - diff --git a/source/com/c2kernel/entity/proxy/ProxyClientConnection.java b/source/com/c2kernel/entity/proxy/ProxyClientConnection.java deleted file mode 100644 index 9687f22..0000000 --- a/source/com/c2kernel/entity/proxy/ProxyClientConnection.java +++ /dev/null @@ -1,185 +0,0 @@ -package com.c2kernel.entity.proxy; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.InterruptedIOException; -import java.io.PrintWriter; -import java.net.Socket; -import java.net.SocketException; -import java.util.ArrayList; -import java.util.Iterator; - -import com.c2kernel.common.InvalidDataException; -import com.c2kernel.utils.Logger; -import com.c2kernel.utils.server.SocketHandler; - -/************************************************************************** - * - * $Revision: 1.18 $ - * $Date: 2005/05/10 11:40:09 $ - * - * Copyright (C) 2003 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -public class ProxyClientConnection implements SocketHandler { - - Socket clientSocket = null; - static int clientId = -1; - int thisClientId; - ArrayList sysKeys; - PrintWriter response; - BufferedReader request; - boolean closing = false; - - public ProxyClientConnection() { - super(); - thisClientId = ++clientId; - EntityProxyManager.registerProxyClient(this); - Logger.msg(1, "Proxy Client Connection Handler "+thisClientId+" ready."); - } - - - @Override - public String getName() { - return "Proxy Client Connection"; - } - - @Override - public boolean isBusy() { - return clientSocket != null; - } - - @Override - public synchronized void setSocket(Socket newSocket) { - try { - Logger.msg(1, "Proxy Client Connection "+thisClientId+" connect from "+newSocket.getInetAddress()+":"+newSocket.getPort()); - newSocket.setSoTimeout(500); - clientSocket = newSocket; - response = new PrintWriter(clientSocket.getOutputStream(), true); - sysKeys = new ArrayList(); - } catch (SocketException ex) { - Logger.msg("Could not set socket timeout:"); - Logger.error(ex); - closeSocket(); - } catch (IOException ex) { - Logger.msg("Could not setup output stream:"); - Logger.error(ex); - closeSocket(); - } - } - - /** - * Main loop. Reads proxy commands from the client and acts on them. - */ - @Override - public void run() { - Thread.currentThread().setName("Proxy Client Connection: "+clientSocket.getInetAddress()); - Logger.msg(7, "ProxyClientConnection "+thisClientId+" - Setting up proxy client connection with "+clientSocket.getInetAddress()); - try { - request = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); - String input = null; - ProxyMessage thisMessage; - while (clientSocket != null) { - try { - input = request.readLine(); - Logger.msg(9, "ProxyClientConnection "+thisClientId+" - received "+input); - thisMessage = new ProxyMessage(input); - processMessage(thisMessage); - } catch (InterruptedIOException ex) { //timeout - } catch (InvalidDataException ex) { // invalid proxy message - Logger.error("ProxyClientConnection "+thisClientId+" - Invalid proxy message: "+input); - } - - } - } catch (IOException ex) { - if (!closing) - Logger.error("ProxyClientConnection "+thisClientId+" - Error reading from socket."); - } - closeSocket(); - Logger.msg(1, "ProxyClientConnection "+thisClientId+" closed."); - } - - private void processMessage(ProxyMessage message) throws InvalidDataException { - - // proxy disconnection - if (message.getPath().equals(ProxyMessage.BYEPATH)) { - Logger.msg(7, "ProxyClientConnection "+thisClientId+" disconnecting"); - closeSocket(); - } - - // proxy checking connection - else if (message.getPath().equals(ProxyMessage.PINGPATH)) - response.println(ProxyMessage.pingMessage); - - // new subscription to entity changes - else if (message.getPath().equals(ProxyMessage.ADDPATH)) { - Logger.msg(7, "ProxyClientConnection "+thisClientId+" subscribed to "+message.getSysKey()); - synchronized (sysKeys) { - sysKeys.add(new Integer(message.getSysKey())); - } - } - - // remove of subscription to entity changes - else if (message.getPath().equals(ProxyMessage.DELPATH)) { - synchronized (sysKeys) { - sysKeys.remove(new Integer(message.getSysKey())); - } - Logger.msg(7, "ProxyClientConnection "+thisClientId+" unsubscribed from "+message.getSysKey()); - } - - else // unknown message - Logger.error("ProxyClientConnection "+thisClientId+" - Unknown message type: "+message); - - } - - public synchronized void sendMessage(ProxyMessage message) { - if (clientSocket==null) return; // idle - boolean relevant = message.getSysKey() == ProxyMessage.NA; - synchronized (sysKeys) { - for (Iterator iter = sysKeys.iterator(); iter.hasNext() && !relevant;) { - Integer thisKey = iter.next(); - if (thisKey.intValue() == message.getSysKey()) - relevant = true; - } - } - if (!relevant) return; // not for our client - - response.println(message); - } - - @Override - public void shutdown() { - if (isBusy()) { - closing = true; - Logger.msg("ProxyClientConnection "+thisClientId+" closing."); - closeSocket(); - } - } - - @Override - public String toString() { - if (clientSocket == null) return thisClientId+": idle"; - else return thisClientId+": "+clientSocket.getInetAddress(); - } - - private synchronized void closeSocket() { - if (clientSocket==null) return; - try { - request.close(); - response.close(); - clientSocket.close(); - } catch (IOException e) { - Logger.error("ProxyClientConnection "+thisClientId+" - Could not close socket."); - Logger.error(e); - } - synchronized (sysKeys) { - sysKeys = null; - } - - clientSocket = null; - - } - -} diff --git a/source/com/c2kernel/entity/proxy/ProxyMessage.java b/source/com/c2kernel/entity/proxy/ProxyMessage.java deleted file mode 100644 index 62866eb..0000000 --- a/source/com/c2kernel/entity/proxy/ProxyMessage.java +++ /dev/null @@ -1,102 +0,0 @@ -package com.c2kernel.entity.proxy; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.util.StringTokenizer; - -import com.c2kernel.common.InvalidDataException; - - -/************************************************************************** - * - * $Revision: 1.11 $ - * $Date: 2005/05/10 11:40:09 $ - * - * Copyright (C) 2003 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -public class ProxyMessage { - - // special server message paths - public static final String BYEPATH = "bye"; - public static final String ADDPATH = "add"; - public static final String DELPATH = "del"; - public static final String PINGPATH = "ping"; - public static final boolean ADDED = false; - public static final boolean DELETED = true; - public static final int NA = -1; - - static ProxyMessage byeMessage = new ProxyMessage(NA, BYEPATH, ADDED); - static ProxyMessage pingMessage = new ProxyMessage(NA, PINGPATH, ADDED); - - private int sysKey = NA; - private String path = ""; - private String server = null; - private boolean state = ADDED; - - public ProxyMessage() { - super(); - } - public ProxyMessage(int sysKey, String path, boolean state) { - this(); - setSysKey(sysKey); - setPath(path); - setState(state); - } - - public ProxyMessage(String line) throws InvalidDataException, IOException { - if (line == null) - throw new IOException("Null proxy message"); - StringTokenizer tok = new StringTokenizer(line,":"); - if (tok.countTokens()!=2) - throw new InvalidDataException("String '"+line+"' does not constitute a valid proxy message.", ""); - sysKey = Integer.parseInt(tok.nextToken()); - path = tok.nextToken(); - if (path.startsWith("-")) { - state = DELETED; - path = path.substring(1); - } - } - - public ProxyMessage(DatagramPacket packet) throws InvalidDataException, IOException { - this(new String(packet.getData())); - } - - public int getSysKey() { - return sysKey; - } - - public void setSysKey(int sysKey) { - this.sysKey = sysKey; - } - - public String getPath() { - return path; - } - - public void setPath(String newPath) { - this.path = newPath; - } - - public boolean getState() { - return state; - } - - public void setState(boolean state) { - this.state = state; - } - - @Override - public String toString() { - return sysKey+":"+(state?"-":"")+path; - } - - public String getServer() { - return server; - } - - public void setServer(String server) { - this.server = server; - } -} diff --git a/source/com/c2kernel/entity/proxy/ProxyServerConnection.java b/source/com/c2kernel/entity/proxy/ProxyServerConnection.java deleted file mode 100644 index 6807953..0000000 --- a/source/com/c2kernel/entity/proxy/ProxyServerConnection.java +++ /dev/null @@ -1,134 +0,0 @@ -/************************************************************************** - * EntityProxyFactory.java - * - * $Revision: 1.3 $ - * $Date: 2005/05/25 12:11:44 $ - * - * Copyright (C) 2001 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -package com.c2kernel.entity.proxy; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.InterruptedIOException; -import java.io.PrintWriter; -import java.net.Socket; - -import com.c2kernel.common.InvalidDataException; -import com.c2kernel.utils.Logger; - - -public class ProxyServerConnection extends Thread -{ - - public boolean serverIsActive = true; - // proxy client details - String serverName; - int serverPort; - Socket serverConnection; - EntityProxyManager manager; - // for talking to the proxy server - PrintWriter serverStream; - boolean listening = false; - static boolean isServer = false; - - /** - * Create an entity proxy manager to listen for proxy events and reap unused proxies - */ - public ProxyServerConnection(String host, int port, EntityProxyManager manager) - { - Logger.msg(5, "ProxyServerConnection - Initialising connection to "+host+":"+port); - serverName = host; - serverPort = port; - this.manager = manager; - listening = true; - start(); - } - - @Override - public void run() { - Thread.currentThread().setName("Proxy Client Connection Listener to "+serverName+":"+serverPort); - while (listening) { - try { - if (serverConnection == null) connect(); - if (serverConnection != null) { - BufferedReader request = new BufferedReader(new InputStreamReader(serverConnection.getInputStream())); - String input = null; - ProxyMessage thisMessage; - while (listening && serverConnection != null) { - try { - input = request.readLine(); - thisMessage = new ProxyMessage(input); - thisMessage.setServer(serverName); - manager.processMessage(thisMessage); - } catch (InterruptedIOException ex) { // timeout - send a ping - sendMessage(ProxyMessage.pingMessage); - } catch (InvalidDataException ex) { // invalid proxy message - if (input != null) - Logger.error("EntityProxyManager - Invalid proxy message: "+input); - } - } - } - } catch (IOException ex) { - Logger.error("ProxyServerConnection - Disconnected from "+serverName+":"+serverPort); - try { - serverStream.close(); - serverConnection.close(); - } catch (IOException e1) { } - - - serverStream = null; - serverConnection = null; - } - } - - if (serverStream != null) { - try { - Logger.msg(1, "Disconnecting from proxy server on "+serverName+":"+serverPort); - serverStream.println(ProxyMessage.byeMessage.toString()); - serverStream.close(); - serverConnection.close(); - serverConnection = null; - } catch (Exception e) { - Logger.error("Error disconnecting from proxy server."); - } - } - } - - public void connect() { - Logger.msg(3, "ProxyServerConnection - connecting to proxy server on "+serverName+":"+serverPort); - try { - serverConnection = new Socket(serverName, serverPort); - serverConnection.setKeepAlive(true); - serverIsActive = true; - serverConnection.setSoTimeout(5000); - serverStream = new PrintWriter(serverConnection.getOutputStream(), true); - Logger.msg("Connected to proxy server on "+serverName+":"+serverPort); - manager.resubscribe(this); - } catch (Exception e) { - Logger.msg(3, "Could not connect to proxy server. Retrying in 5s"); - try { Thread.sleep(5000); } catch (InterruptedException ex) { } - serverStream = null; - serverConnection = null; - serverIsActive = false; - } - } - - public void shutdown() { - Logger.msg("Proxy Client: flagging shutdown."); - listening = false; - } - - /** - * @param sub - */ - public void sendMessage(ProxyMessage sub) { - if (serverStream != null) - serverStream.println(sub); - } - -} - -- cgit v1.2.3