From b086f57f56bf0eb9dab9cf321a0f69aaaae84347 Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Wed, 30 May 2012 08:37:45 +0200 Subject: Initial Maven Conversion --- .../java/com/c2kernel/entity/proxy/AgentProxy.java | 302 ++++++++++++++++++ .../entity/proxy/DomainPathSubscriber.java | 18 ++ .../com/c2kernel/entity/proxy/EntityProxy.java | 248 +++++++++++++++ .../c2kernel/entity/proxy/EntityProxyManager.java | 339 +++++++++++++++++++++ .../c2kernel/entity/proxy/EntityProxyObserver.java | 27 ++ .../java/com/c2kernel/entity/proxy/ItemProxy.java | 213 +++++++++++++ .../c2kernel/entity/proxy/MemberSubscription.java | 118 +++++++ .../entity/proxy/ProxyClientConnection.java | 185 +++++++++++ .../com/c2kernel/entity/proxy/ProxyMessage.java | 102 +++++++ .../entity/proxy/ProxyServerConnection.java | 134 ++++++++ 10 files changed, 1686 insertions(+) create mode 100644 src/main/java/com/c2kernel/entity/proxy/AgentProxy.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/DomainPathSubscriber.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/EntityProxy.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/EntityProxyManager.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/EntityProxyObserver.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/ItemProxy.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/MemberSubscription.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/ProxyMessage.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/ProxyServerConnection.java (limited to 'src/main/java/com/c2kernel/entity/proxy') diff --git a/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java b/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java new file mode 100644 index 0000000..72ed088 --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java @@ -0,0 +1,302 @@ +/************************************************************************** + * AgentProxy.java + * + * $Revision: 1.37 $ + * $Date: 2005/10/05 07:39:36 $ + * + * Copyright (C) 2001 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +package com.c2kernel.entity.proxy; + +import java.util.Date; +import java.util.Enumeration; + +import com.c2kernel.common.AccessRightsException; +import com.c2kernel.common.InvalidDataException; +import com.c2kernel.common.InvalidTransitionException; +import com.c2kernel.common.ObjectAlreadyExistsException; +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.common.PersistencyException; +import com.c2kernel.entity.Agent; +import com.c2kernel.entity.AgentHelper; +import com.c2kernel.entity.C2KLocalObject; +import com.c2kernel.entity.ManageableEntity; +import com.c2kernel.entity.agent.Job; +import com.c2kernel.lifecycle.instance.predefined.PredefinedStep; +import com.c2kernel.lifecycle.instance.stateMachine.Transitions; +import com.c2kernel.lookup.AgentPath; +import com.c2kernel.lookup.DomainPath; +import com.c2kernel.lookup.EntityPath; +import com.c2kernel.lookup.InvalidEntityPathException; +import com.c2kernel.lookup.Path; +import com.c2kernel.persistency.outcome.OutcomeValidator; +import com.c2kernel.persistency.outcome.Schema; +import com.c2kernel.process.Gateway; +import com.c2kernel.scripting.ErrorInfo; +import com.c2kernel.scripting.Script; +import com.c2kernel.scripting.ScriptingEngineException; +import com.c2kernel.utils.CastorXMLUtility; +import com.c2kernel.utils.LocalObjectLoader; +import com.c2kernel.utils.Logger; + +/****************************************************************************** + * It is a wrapper for the connection and communication with Agent + * It caches data loaded from the Agent to reduce communication + * + * @version $Revision: 1.37 $ $Date: 2005/10/05 07:39:36 $ + * @author $Author: abranson $ + ******************************************************************************/ +public class AgentProxy extends EntityProxy +{ + AgentPath path; + + /************************************************************************** + * Creates an AgentProxy without cache and change notification + **************************************************************************/ + public AgentProxy( org.omg.CORBA.Object ior, + int systemKey) + throws ObjectNotFoundException + { + super(ior, systemKey); + try { + path = new AgentPath(systemKey); + } catch (InvalidEntityPathException e) { + throw new ObjectNotFoundException(); + } + } + + @Override + public ManageableEntity narrow() throws ObjectNotFoundException + { + try { + return AgentHelper.narrow(mIOR); + } catch (org.omg.CORBA.BAD_PARAM ex) { } + throw new ObjectNotFoundException("CORBA Object was not an Agent, or the server is down."); + } + /************************************************************************** + * + * + **************************************************************************/ + public void initialise( String agentProps, String collector ) + throws AccessRightsException, + InvalidDataException, + PersistencyException, + ObjectNotFoundException + { + Logger.msg(7, "AgentProxy::initialise - started"); + + ((Agent)getEntity()).initialise( agentProps ); + } + + public AgentPath getPath() { + return path; + } + + /** + * Executes a job on the given item using this agent. + * + * @param item - item holding this job + * @param job - the job to execute + */ + public void execute(ItemProxy item, Job job) + throws AccessRightsException, + InvalidTransitionException, + ObjectNotFoundException, + InvalidDataException, + PersistencyException, + ObjectAlreadyExistsException + { + OutcomeValidator validator = null; + String scriptName = job.getActPropString("ScriptName"); + Date startTime = new Date(); + Logger.msg(3, "AgentProxy - executing "+job.getStepPath()+" for "+path.getAgentName()); + // get the outcome validator if present + if (job.isOutcomeUsed()) + { + + // get schema info from act props + String schemaName = job.getActPropString("SchemaType"); + int schemaVersion; + try { + schemaVersion = Integer.parseInt(job.getActPropString("SchemaVersion")); + } catch (Exception e) { + throw new InvalidDataException(e.getClass().getName()+" extracing schema version", ""); + } + Logger.msg(5, "AgentProxy - fetching schema "+schemaName+"_"+schemaVersion+" for validation"); + // retrieve schema + Schema schema = LocalObjectLoader.getSchema(schemaName, schemaVersion); + + if (schema == null) + throw new InvalidDataException("Job references outcome type "+schemaName+" version "+schemaVersion+" that does not exist in this centre.", ""); + + try { + validator = OutcomeValidator.getValidator(schema); + } catch (Exception e) { + throw new InvalidDataException("Could not create validator: "+e.getMessage(), ""); + } + } + + if(scriptName != null && scriptName.length() > 0 && + (job.getPossibleTransition() == Transitions.DONE || job.getPossibleTransition() == Transitions.COMPLETE)) { + Logger.msg(3, "AgentProxy - executing script "+scriptName); + try { + + // pre-validate outcome from script if there is one + if (job.getOutcomeString()!= null && validator != null) { + Logger.msg(5, "AgentProxy - validating outcome before script execution"); + String error = validator.validate(job.getOutcomeString()); + if (error.length() > 0) { + Logger.error("Outcome not valid: \n " + error); + throw new InvalidDataException(error, ""); + } + } + + // load script + ErrorInfo scriptErrors = (ErrorInfo)callScript(item, job); + + if (scriptErrors.getFatal()) { + Logger.msg(3, "AgentProxy - fatal script error"); + Logger.error(scriptErrors.getErrors()); + throw new InvalidDataException("Fatal Script Error: \n"+scriptErrors.getErrors(), ""); + } + if (scriptErrors.getErrors().length() > 0) + Logger.warning("Script errors: "+scriptErrors.getErrors()); + } catch (ScriptingEngineException ex) { + Logger.error(ex); + throw new InvalidDataException(ex.getMessage(), ""); + } + } + + if (job.isOutcomeUsed()) { + Logger.msg(3, "AgentProxy - validating outcome"); + String error = validator.validate(job.getOutcomeString()); + if (error.length() > 0) + throw new InvalidDataException(error, ""); + } + + job.setAgentId(getSystemKey()); + Logger.msg(3, "AgentProxy - submitting job to item proxy"); + item.requestAction(job); + if (Logger.doLog(3)) { + Date timeNow = new Date(); + long secsNow = (timeNow.getTime()-startTime.getTime())/1000; + Logger.msg(3, "Execution took "+secsNow+" seconds"); + } + } + + public Object callScript(ItemProxy item, Job job) throws ScriptingEngineException { + Script script = new Script(item, this, job); + return script.execute(); + } + + /** + * Standard execution of jobs. Note that this method should always be the one used from clients - all execution + * parameters are taken from the job where they're probably going to be correct. + * + * @param job + * @throws AccessRightsException + * @throws InvalidDataException + * @throws InvalidTransitionException + * @throws ObjectNotFoundException + * @throws PersistencyException + * @throws ObjectAlreadyExistsException + */ + public void execute(Job job) + throws AccessRightsException, + InvalidDataException, + InvalidTransitionException, + ObjectNotFoundException, + PersistencyException, + ObjectAlreadyExistsException + { + try { + ItemProxy targetItem = (ItemProxy)Gateway.getProxyManager().getProxy(new EntityPath(job.getItemSysKey())); + execute(targetItem, job); + } catch (InvalidEntityPathException e) { + throw new ObjectNotFoundException("Job contained invalid item sysKey: "+job.getItemSysKey(), ""); + } + } + + public void execute(ItemProxy item, String predefStep, C2KLocalObject obj) + throws AccessRightsException, + InvalidDataException, + InvalidTransitionException, + ObjectNotFoundException, + PersistencyException, + ObjectAlreadyExistsException + { + String param; + try { + param = marshall(obj); + } catch (Exception ex) { + Logger.error(ex); + throw new InvalidDataException("Error on marshall", ""); + } + execute(item, predefStep, param); + } + + public void execute(ItemProxy item, String predefStep, String param) + throws AccessRightsException, + InvalidDataException, + InvalidTransitionException, + ObjectNotFoundException, + PersistencyException, + ObjectAlreadyExistsException + { + String[] params = new String[1]; + params[0] = param; + execute(item, predefStep, params); + } + + public void execute(ItemProxy item, String predefStep, String[] params) + throws AccessRightsException, + InvalidDataException, + InvalidTransitionException, + ObjectNotFoundException, + PersistencyException, + ObjectAlreadyExistsException + { + item.requestAction(getSystemKey(), "workflow/predefined/"+predefStep, Transitions.DONE, PredefinedStep.bundleData(params)); + } + + /** Wrappers for scripts */ + public String marshall(Object obj) throws Exception { + return CastorXMLUtility.marshall(obj); + } + + public Object unmarshall(String obj) throws Exception { + return CastorXMLUtility.unmarshall(obj); + } + + /** Let scripts resolve items */ + public ItemProxy searchItem(String name) throws ObjectNotFoundException { + Enumeration results = Gateway.getLDAPLookup().search(new DomainPath(""),name); + + Path returnPath = null; + if (!results.hasMoreElements()) + throw new ObjectNotFoundException(name, ""); + + while(results.hasMoreElements()) { + Path nextMatch = results.nextElement(); + if (returnPath != null && nextMatch.getSysKey() != -1 && returnPath.getSysKey() != nextMatch.getSysKey()) + throw new ObjectNotFoundException("Too many items with that name"); + returnPath = nextMatch; + } + + return (ItemProxy)Gateway.getProxyManager().getProxy(returnPath); + } + + public ItemProxy getItem(String path) throws ObjectNotFoundException { + return (getItem(new DomainPath(path))); + } + + public ItemProxy getItem(DomainPath path) throws ObjectNotFoundException { + return (ItemProxy)Gateway.getProxyManager().getProxy(path); + } + + public ItemProxy getItemBySysKey(int sysKey) throws ObjectNotFoundException, InvalidEntityPathException { + return (ItemProxy)Gateway.getProxyManager().getProxy(new EntityPath(sysKey)); + } +} diff --git a/src/main/java/com/c2kernel/entity/proxy/DomainPathSubscriber.java b/src/main/java/com/c2kernel/entity/proxy/DomainPathSubscriber.java new file mode 100644 index 0000000..4089325 --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/DomainPathSubscriber.java @@ -0,0 +1,18 @@ +package com.c2kernel.entity.proxy; + +import com.c2kernel.lookup.DomainPath; + +/************************************************************************** + * + * $Revision: 1.1 $ + * $Date: 2004/02/05 16:11:57 $ + * + * Copyright (C) 2003 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +public interface DomainPathSubscriber { + + public void pathAdded(DomainPath path); + public void pathRemoved(DomainPath path); +} diff --git a/src/main/java/com/c2kernel/entity/proxy/EntityProxy.java b/src/main/java/com/c2kernel/entity/proxy/EntityProxy.java new file mode 100644 index 0000000..fae2e28 --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/EntityProxy.java @@ -0,0 +1,248 @@ +/************************************************************************** + * EntityProxy.java + * + * $Revision: 1.35 $ + * $Date: 2005/05/10 11:40:09 $ + * + * Copyright (C) 2001 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +package com.c2kernel.entity.proxy; + +import java.util.HashMap; +import java.util.Iterator; + +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.entity.C2KLocalObject; +import com.c2kernel.entity.ManageableEntity; +import com.c2kernel.persistency.ClusterStorageException; +import com.c2kernel.process.Gateway; +import com.c2kernel.property.Property; +import com.c2kernel.utils.CastorXMLUtility; +import com.c2kernel.utils.Logger; + + +/****************************************************************************** +* It is a wrapper for the connection and communication with Entities. +* It can cache data loaded from the Entity to reduce communication with it. +* This cache is syncronised with corresponding Entity through an event mechanism. +* +* @version $Revision: 1.35 $ $Date: 2005/05/10 11:40:09 $ +* @author $Author: abranson $ +******************************************************************************/ + +abstract public class EntityProxy implements ManageableEntity +{ + + protected ManageableEntity mEntity = null; + protected org.omg.CORBA.Object mIOR; + protected int mSystemKey; + private HashMap, EntityProxyObserver> mSubscriptions; + + /************************************************************************** + * + **************************************************************************/ + protected EntityProxy( org.omg.CORBA.Object ior, + int systemKey) + throws ObjectNotFoundException + { + Logger.msg(8,"EntityProxy::EntityProxy() - Initialising '" +systemKey+ "' entity"); + + initialise( ior, systemKey); + } + + /************************************************************************** + * + **************************************************************************/ + private void initialise( org.omg.CORBA.Object ior, + int systemKey) + throws ObjectNotFoundException + { + Logger.msg(8, "EntityProxy::initialise() - Initialising '" +systemKey+ "' entity"); + + mIOR = ior; + mSystemKey = systemKey; + mSubscriptions = new HashMap, EntityProxyObserver>(); + } + + + /************************************************************************** + * + **************************************************************************/ + public ManageableEntity getEntity() throws ObjectNotFoundException + { + if (mEntity == null) { + mEntity = narrow(); + } + return mEntity; + } + + abstract public ManageableEntity narrow() throws ObjectNotFoundException; + + /************************************************************************** + * + **************************************************************************/ + //check who is using.. and if toString() is sufficient + @Override + public int getSystemKey() + { + return mSystemKey; + } + + + /************************************************************************** + * + **************************************************************************/ + @Override + public String queryData( String path ) + throws ObjectNotFoundException + { + + try { + Logger.msg(7, "EntityProxy.queryData() - "+mSystemKey+"/"+path); + if (path.endsWith("all")) { + Logger.msg(7, "EntityProxy.queryData() - listing contents"); + String[] result = Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length()-3)); + StringBuffer retString = new StringBuffer(); + for (int i = 0; i < result.length; i++) { + retString.append(result[i]); + if (i"+e.getMessage()+""; + } + } + + public String[] getContents( String path ) throws ObjectNotFoundException { + try { + return Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length())); + } catch (ClusterStorageException e) { + throw new ObjectNotFoundException(e.toString()); + } + } + + + /************************************************************************** + * + **************************************************************************/ + public C2KLocalObject getObject( String xpath ) + throws ObjectNotFoundException + { + // load from storage, falling back to proxy loader if not found in others + try + { + return Gateway.getStorage().get( mSystemKey, xpath , null); + } + catch( ClusterStorageException ex ) + { + Logger.msg(4, "Exception loading object :"+mSystemKey+"/"+xpath); + throw new ObjectNotFoundException( ex.toString() ); + } + } + + + + public String getProperty( String name ) + throws ObjectNotFoundException + { + Logger.msg(5, "Get property "+name+" from syskey/"+mSystemKey); + Property prop = (Property)getObject("Property/"+name); + try + { + return prop.getValue(); + } + catch (NullPointerException ex) + { + throw new ObjectNotFoundException(); + } + } + + public String getName() + { + try { + return getProperty("Name"); + } catch (ObjectNotFoundException ex) { + return null; + } + } + + + /************************************************************************** + * Subscription methods + **************************************************************************/ + + public void subscribe (MemberSubscription newSub) { + + newSub.setSubject(this); + synchronized (this){ + mSubscriptions.put( newSub, newSub.getObserver() ); + } + new Thread(newSub).start(); + Logger.msg(7, "Subscribed "+newSub.getObserver().getClass().getName()+" for "+newSub.interest); + } + + public void unsubscribe(EntityProxyObserver observer) + { + synchronized (this){ + for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { + MemberSubscription thisSub = e.next(); + if (mSubscriptions.get( thisSub ) == observer) { + e.remove(); + Logger.msg(7, "Unsubscribed "+observer.getClass().getName()); + } + } + } + } + + public void dumpSubscriptions(int logLevel) { + if (mSubscriptions.size() == 0) return; + Logger.msg(logLevel, "Subscriptions to proxy "+mSystemKey+":"); + synchronized(this) { + for (MemberSubscription element : mSubscriptions.keySet()) { + EntityProxyObserver obs = element.getObserver(); + if (obs != null) + Logger.msg(logLevel, " "+element.getObserver().getClass().getName()+" subscribed to "+element.interest); + else + Logger.msg(logLevel, " Phantom subscription to "+element.interest); + } + } + } + + public void notify(ProxyMessage message) { + Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mSystemKey); + synchronized (this){ + if (!message.getServer().equals(EntityProxyManager.serverName)) + Gateway.getStorage().clearCache(mSystemKey, message.getPath()); + for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { + MemberSubscription newSub = e.next(); + if (newSub.getObserver() == null) { // phantom + Logger.msg(4, "Removing phantom subscription to "+newSub.interest); + e.remove(); + } + else + newSub.update(message.getPath(), message.getState()); + } + } + } + + /** + * If this is reaped, clear out the cache for it too. + */ + @Override + protected void finalize() throws Throwable { + Logger.msg(7, "Proxy "+mSystemKey+" reaped"); + Gateway.getStorage().clearCache(mSystemKey, null); + Gateway.getProxyManager().removeProxy(mSystemKey); + super.finalize(); + } + +} diff --git a/src/main/java/com/c2kernel/entity/proxy/EntityProxyManager.java b/src/main/java/com/c2kernel/entity/proxy/EntityProxyManager.java new file mode 100644 index 0000000..192a984 --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/EntityProxyManager.java @@ -0,0 +1,339 @@ +/************************************************************************** + * EntityProxyFactory.java + * + * $Revision: 1.45 $ + * $Date: 2005/05/10 11:40:09 $ + * + * Copyright (C) 2001 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +package com.c2kernel.entity.proxy; + +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Iterator; + +import com.c2kernel.common.InvalidDataException; +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.lookup.AgentPath; +import com.c2kernel.lookup.DomainPath; +import com.c2kernel.lookup.Path; +import com.c2kernel.persistency.ClusterStorage; +import com.c2kernel.process.Gateway; +import com.c2kernel.property.Property; +import com.c2kernel.utils.Logger; +import com.c2kernel.utils.SoftCache; +import com.c2kernel.utils.server.SimpleTCPIPServer; + + +public class EntityProxyManager +{ + SoftCache proxyPool = new SoftCache(50); + HashMap treeSubscribers = new HashMap(); + HashMap connections = new HashMap(); + + // server objects + static ArrayList proxyClients = new ArrayList(); + static SimpleTCPIPServer proxyServer = null; + static String serverName = null; + + /** + * Create an entity proxy manager to listen for proxy events and reap unused proxies + */ + public EntityProxyManager() + { + Logger.msg(5, "EntityProxyManager - Starting....."); + + Enumeration servers = Gateway.getLDAPLookup().searchEntities(new DomainPath("/servers")); + while(servers.hasMoreElements()) { + Path thisServerPath = servers.nextElement(); + try { + int syskey = thisServerPath.getSysKey(); + String remoteServer = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/Name", null)).getValue(); + String portStr = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/ProxyPort", null)).getValue(); + int remotePort = Integer.parseInt(portStr); + connectToProxyServer(remoteServer, remotePort); + + } catch (Exception ex) { + Logger.error("Exception retrieving proxy server connection data for "+thisServerPath); + Logger.error(ex); + } + } + } + + public void connectToProxyServer(String name, int port) { + ProxyServerConnection oldConn = connections.get(name); + if (oldConn != null) + oldConn.shutdown(); + connections.put(name, new ProxyServerConnection(name, port, this)); + } + + + protected void resubscribe(ProxyServerConnection conn) { + synchronized (proxyPool) { + for (Integer key : proxyPool.keySet()) { + ProxyMessage sub = new ProxyMessage(key.intValue(), ProxyMessage.ADDPATH, false); + Logger.msg(5, "Subscribing to entity "+key); + conn.sendMessage(sub); + } + } + } + + /** + * @param sub + */ + private void sendMessage(ProxyMessage sub) { + for (ProxyServerConnection element : connections.values()) { + element.sendMessage(sub); + } + + } + + public void shutdown() { + Logger.msg("EntityProxyManager.shutdown() - flagging shutdown of server connections"); + for (ProxyServerConnection element : connections.values()) { + element.shutdown(); + } + } + + protected void processMessage(ProxyMessage thisMessage) throws InvalidDataException { + if (Logger.doLog(9)) Logger.msg(9, thisMessage.toString()); + + if (thisMessage.getPath().equals(ProxyMessage.PINGPATH)) // ping response + return; + + if (thisMessage.getSysKey() == ProxyMessage.NA) // must be domain path info + informTreeSubscribers(thisMessage.getState(), thisMessage.getPath()); + else { + // proper proxy message + Logger.msg(5, "Received proxy message: "+thisMessage.toString()); + Integer key = new Integer(thisMessage.getSysKey()); + EntityProxy relevant = proxyPool.get(key); + if (relevant == null) + Logger.warning("Received proxy message for sysKey "+thisMessage.getSysKey()+" which we don't have a proxy for."); + else + try { + relevant.notify(thisMessage); + } catch (Throwable ex) { + Logger.error("Error caught notifying proxy listener "+relevant.toString()+" of "+thisMessage.toString()); + Logger.error(ex); + } + } + } + + private void informTreeSubscribers(boolean state, String path) { + DomainPath last = new DomainPath(path); + DomainPath parent; boolean first = true; + synchronized(treeSubscribers) { + while((parent = last.getParent()) != null) { + + for (DomainPathSubscriber sub : treeSubscribers.keySet()) { + DomainPath interest = treeSubscribers.get(sub); + if (interest.equals(parent)) { + if (state == ProxyMessage.ADDED) + sub.pathAdded(last); + else if (first) + sub.pathRemoved(last); + } + } + last = parent; + first = false; + } + } + } + + public void subscribeTree(DomainPathSubscriber sub, DomainPath interest) { + synchronized(treeSubscribers) { + treeSubscribers.put(sub, interest); + } + } + + public void unsubscribeTree(DomainPathSubscriber sub) { + synchronized(treeSubscribers) { + treeSubscribers.remove(sub); + } + } + + /************************************************************************** + * + **************************************************************************/ + private EntityProxy createProxy( org.omg.CORBA.Object ior, + int systemKey, + boolean isItem ) + throws ObjectNotFoundException + { + + EntityProxy newProxy = null; + + Logger.msg(5, "EntityProxyFactory::creating proxy on entity " + systemKey); + + if( isItem ) + { + newProxy = new ItemProxy(ior, systemKey); + } + else + { + newProxy = new AgentProxy(ior, systemKey); + } + + // subscribe to changes from server + ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.ADDPATH, false); + sendMessage(sub); + reportCurrentProxies(9); + return ( newProxy ); + } + + protected void removeProxy( int systemKey ) + { + ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.DELPATH, true); + Logger.msg(5,"EntityProxyManager.removeProxy() - Unsubscribing to proxy informer for "+systemKey); + sendMessage(sub); + } + + + /************************************************************************** + * EntityProxy getProxy( ManageableEntity, SystemKey) + * + * Called by the other GetProxy methods. Fills in either the ior or the + * SystemKey + **************************************************************************/ + private EntityProxy getProxy( org.omg.CORBA.Object ior, + int systemKey, + boolean isItem ) + throws ObjectNotFoundException + { + Integer key = new Integer(systemKey); + + synchronized(proxyPool) { + EntityProxy newProxy; + // return it if it exists + newProxy = proxyPool.get(key); + if (newProxy == null) { + // create a new one + newProxy = createProxy(ior, systemKey, isItem ); + proxyPool.put(key, newProxy); + } + return newProxy; + + } + } + + /************************************************************************** + * EntityProxy getProxy( String ) + * + * Proxy from Alias + **************************************************************************/ + public EntityProxy getProxy( Path path ) + throws ObjectNotFoundException + { + + //convert namePath to dn format + Logger.msg(8,"EntityProxyFactory::getProxy(" + path.toString() + ")"); + boolean isItem = !(path.getEntity() instanceof AgentPath); + return getProxy( Gateway.getLDAPLookup().getIOR(path), + path.getSysKey(), + isItem ); + + } + + /************************************************************************** + * void reportCurrentProxies() + * + * A utility to Dump the current proxies loaded + **************************************************************************/ + public void reportCurrentProxies(int logLevel) + { + if (!Logger.doLog(logLevel)) return; + Logger.msg(logLevel, "Current proxies: "); + try { + synchronized(proxyPool) { + Iterator i = proxyPool.keySet().iterator(); + + for( int count=0; i.hasNext(); count++ ) + { + Integer nextProxy = i.next(); + EntityProxy thisProxy = proxyPool.get(nextProxy); + if (thisProxy != null) + Logger.msg(logLevel, + "" + count + ": " + + proxyPool.get(nextProxy).getClass().getName() + + ": " + nextProxy); + } + } + } catch (ConcurrentModificationException ex) { + Logger.msg(logLevel, "Proxy cache modified. Aborting."); + } + } + + + /************************************************************************** + * Static Proxy Server methods + **************************************************************************/ + + /** + * Initialises the Proxy event UDP server listening on 'Host.Proxy.port' from c2kprops + * @param c2kProps + */ + public static void initServer() + { + Logger.msg(5, "EntityProxyFactory::initServer - Starting....."); + String port = Gateway.getProperty("ItemServer.Proxy.port"); + serverName = Gateway.getProperty("ItemServer.name"); + if (port == null) { + Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of entity changes."); + return; + } + + // set up the proxy server + try { + int portNo = Integer.parseInt(port); + Logger.msg(5, "EntityProxyFactory::initServer - Initialising proxy informer on port "+port); + proxyServer = new SimpleTCPIPServer(portNo, ProxyClientConnection.class, 200); + proxyServer.startListening(); + } catch (Exception ex) { + Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of entity changes."); + Logger.error(ex); + } + } + + public static void sendProxyEvent(ProxyMessage message) { + if (proxyServer != null && message.getPath() != null) + synchronized(proxyClients) { + for (ProxyClientConnection client : proxyClients) { + client.sendMessage(message); + } + } + } + + public static void reportConnections(int logLevel) { + synchronized(proxyClients) { + Logger.msg(logLevel, "Currently connected proxy clients:"); + for (ProxyClientConnection client : proxyClients) { + Logger.msg(logLevel, " "+client); + } + } + } + + public static void shutdownServer() { + if (proxyServer != null) { + Logger.msg(1, "EntityProxyManager: Closing Server."); + proxyServer.stopListening(); + } + } + + public static void registerProxyClient(ProxyClientConnection client) { + synchronized(proxyClients) { + proxyClients.add(client); + } + } + + public static void unRegisterProxyClient(ProxyClientConnection client) { + synchronized(proxyClients) { + proxyClients.remove(client); + } + } +} + diff --git a/src/main/java/com/c2kernel/entity/proxy/EntityProxyObserver.java b/src/main/java/com/c2kernel/entity/proxy/EntityProxyObserver.java new file mode 100644 index 0000000..3ddb99c --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/EntityProxyObserver.java @@ -0,0 +1,27 @@ +package com.c2kernel.entity.proxy; + +import com.c2kernel.entity.C2KLocalObject; + + + +public interface EntityProxyObserver +{ + /************************************************************************** + * Subscribed items are broken apart and fed one by one to these methods. + * Replacement after an event is done by feeding the new memberbase with the same id. + * ID could be an XPath? + **************************************************************************/ + public void add(V contents); + + /************************************************************************** + * the 'type' parameter should be an indication of the type of object + * supplied so that the subscriber can associate the call back with + * one of its subscriptions. If we go with an Xpath subscription form, + * then the id will probably be sufficient. + * Should be comparable (substring whatever) with the parameter given to + * the subscribe method of ItemProxy. + **************************************************************************/ + public void remove(String id); + + public void control(String control, String msg); +} diff --git a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java new file mode 100644 index 0000000..658e0c8 --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java @@ -0,0 +1,213 @@ +/************************************************************************** + * ItemProxy.java + * + * $Revision: 1.25 $ + * $Date: 2005/05/10 11:40:09 $ + * + * Copyright (C) 2001 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +package com.c2kernel.entity.proxy; + +import java.util.ArrayList; + +import com.c2kernel.common.AccessRightsException; +import com.c2kernel.common.InvalidDataException; +import com.c2kernel.common.InvalidTransitionException; +import com.c2kernel.common.ObjectAlreadyExistsException; +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.common.PersistencyException; +import com.c2kernel.entity.Item; +import com.c2kernel.entity.ItemHelper; +import com.c2kernel.entity.ManageableEntity; +import com.c2kernel.entity.agent.Job; +import com.c2kernel.entity.agent.JobArrayList; +import com.c2kernel.lifecycle.instance.stateMachine.Transitions; +import com.c2kernel.utils.CastorXMLUtility; +import com.c2kernel.utils.Logger; + +/****************************************************************************** + * It is a wrapper for the connection and communication with Item + * It caches data loaded from the Item to reduce communication + * + * @version $Revision: 1.25 $ $Date: 2005/05/10 11:40:09 $ + * @author $Author: abranson $ + ******************************************************************************/ +public class ItemProxy extends EntityProxy +{ + + /************************************************************************** + * + **************************************************************************/ + protected ItemProxy( org.omg.CORBA.Object ior, + int systemKey) + throws ObjectNotFoundException + { + super(ior, systemKey); + + } + + @Override + public ManageableEntity narrow() throws ObjectNotFoundException + { + try { + return ItemHelper.narrow(mIOR); + } catch (org.omg.CORBA.BAD_PARAM ex) { } + throw new ObjectNotFoundException("CORBA Object was not an Item, or the server is down."); + } + /************************************************************************** + * + * + **************************************************************************/ + public void initialise( int agentId, + String itemProps, + String workflow ) + throws AccessRightsException, + InvalidDataException, + PersistencyException, + ObjectNotFoundException + { + Logger.msg(7, "ItemProxy::initialise - started"); + + ((Item)getEntity()).initialise( agentId, itemProps, workflow ); + } + + public void setProperty(AgentProxy agent, String name, String value) + throws AccessRightsException, + PersistencyException + { + String[] params = new String[2]; + params[0] = name; + params[1] = value; + try { + agent.execute(this, "WriteProperty", params); + } catch (AccessRightsException e) { + throw (e); + } catch (PersistencyException e) { + throw (e); + } catch (Exception e) { + Logger.error(e); + throw new PersistencyException("Could not store property"); + } + } + /************************************************************************** + * + **************************************************************************/ + protected void requestAction( Job thisJob ) + throws AccessRightsException, + InvalidTransitionException, + ObjectNotFoundException, + InvalidDataException, + PersistencyException, + ObjectAlreadyExistsException + { + String outcome = thisJob.getOutcomeString(); + // check fields that should have been filled in + if (outcome==null) + if (thisJob.isOutcomeUsed()) + throw new InvalidDataException("Outcome is required.", ""); + else + outcome=""; + + if (thisJob.getAgentId() == -1) + throw new InvalidDataException("No Agent specified.", ""); + + Logger.msg(7, "ItemProxy - executing "+thisJob.getStepPath()+" for "+thisJob.getAgentName()); + requestAction (thisJob.getAgentId(), thisJob.getStepPath(), + thisJob.getPossibleTransition(), outcome); + } + + //requestData is xmlString + public void requestAction( int agentId, + String stepPath, + int transitionID, + String requestData + ) + throws AccessRightsException, + InvalidTransitionException, + ObjectNotFoundException, + InvalidDataException, + PersistencyException, + ObjectAlreadyExistsException + { + ((Item)getEntity()).requestAction( agentId, + stepPath, + transitionID, + requestData ); + } + + /************************************************************************** + * + **************************************************************************/ + public String queryLifeCycle( int agentId, + boolean filter + ) + throws AccessRightsException, + ObjectNotFoundException, + PersistencyException + { + return ((Item)getEntity()).queryLifeCycle( agentId, + filter ); + } + + + /************************************************************************** + * + **************************************************************************/ + private ArrayList getJobList(int agentId, boolean filter) + throws AccessRightsException, + ObjectNotFoundException, + PersistencyException + { + JobArrayList thisJobList; + try { + String jobs = queryLifeCycle(agentId, filter); + thisJobList = (JobArrayList)CastorXMLUtility.unmarshall(jobs); + } + catch (Exception e) { + Logger.error(e); + throw new PersistencyException("Exception::ItemProxy::getJobList() - Cannot unmarshall the jobs", null); + } + return thisJobList.list; + } + + public ArrayList getJobList(AgentProxy agent) + throws AccessRightsException, + ObjectNotFoundException, + PersistencyException + { + return getJobList(agent.getSystemKey()); + } + + private ArrayList getJobList(int agentId) + throws AccessRightsException, + ObjectNotFoundException, + PersistencyException + { + return getJobList(agentId, true); + } + + private Job getJobByName(String actName, int agentId) + throws AccessRightsException, + ObjectNotFoundException, + PersistencyException { + + ArrayList jobList = getJobList(agentId); + for (Job job : jobList) { + int transition = job.getPossibleTransition(); + if (job.getStepName().equals(actName)) + if (transition == Transitions.COMPLETE || transition == Transitions.DONE) + return job; + } + return null; + + } + + public Job getJobByName(String actName, AgentProxy agent) + throws AccessRightsException, + ObjectNotFoundException, + PersistencyException { + return getJobByName(actName, agent.getSystemKey()); + } +} diff --git a/src/main/java/com/c2kernel/entity/proxy/MemberSubscription.java b/src/main/java/com/c2kernel/entity/proxy/MemberSubscription.java new file mode 100644 index 0000000..157297f --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/MemberSubscription.java @@ -0,0 +1,118 @@ + +package com.c2kernel.entity.proxy; +import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.StringTokenizer; + +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.entity.C2KLocalObject; +import com.c2kernel.utils.Logger; + +public class MemberSubscription implements Runnable { + public static final String ERROR = "Error"; + public static final String END = "theEND"; + + EntityProxy subject; + String interest; + // keep the subscriber by weak reference, so it is not kept from the garbage collector if no longer used + WeakReference> observerReference; + ArrayList contents = new ArrayList(); + boolean preLoad; + + public MemberSubscription(EntityProxyObserver observer, String interest, boolean preLoad) { + setObserver(observer); + this.interest = interest; + this.preLoad = preLoad; + } + + @Override + public void run() { + Thread.currentThread().setName("Member Subscription: "+subject.getSystemKey()+":"+interest); + if (preLoad) loadChildren(); + } + + private void loadChildren() { + C newMember; + EntityProxyObserver observer = getObserver(); + if (observer == null) return; //reaped + try { + // fetch contents of path + String children = subject.queryData(interest+"/all"); + StringTokenizer tok = new StringTokenizer(children, ","); + ArrayList newContents = new ArrayList(); + while (tok.hasMoreTokens()) + newContents.add(tok.nextToken()); + + // look to see what's new + for (String newChild: newContents) { + + // load child object + try { + newMember = (C)subject.getObject(interest+"/"+newChild); + contents.remove(newChild); + observer.add(newMember); + } catch (ObjectNotFoundException ex) { + observer.control(ERROR, "Listed member "+newChild+" was not found."); + } + } + // report what's left in old contents as deleted + for (String oldChild: contents) { + observer.remove(interest+"/"+oldChild); + } + //replace contents arraylist + contents = newContents; + //report that we're done + observer.control(END, null); + } catch (Exception ex) { + observer.control(ERROR, "Query on "+interest+" failed with "+ex); + } + } + + public boolean isRelevant(String path) { + Logger.msg(7, "Checking relevance of "+path+" to "+interest); + return (path.startsWith(interest)); + } + + public void update(String path, boolean deleted) { + EntityProxyObserver observer = getObserver(); + if (observer == null) return; //reaped + Logger.msg(7, "Processing proxy message path "+path +" for "+observer+". Interest: "+interest+" Was Deleted:"+deleted); + if (!path.startsWith(interest)) // doesn't concern us + return; + + if (path.equals(interest)) // refresh contents + loadChildren(); + else { + String name = path.substring(interest.length()); + if (deleted) { + Logger.msg(4, "Removing "+path); + contents.remove(name); + observer.remove(name); + } + else { + try { + C newMember = (C)subject.getObject(path); + Logger.msg(4, "Adding "+path); + contents.add(name); + observer.add(newMember); + } catch (ObjectNotFoundException e) { + Logger.error("Member Subscription: could not load "+path); + Logger.error(e); + } + } + } + } + + public void setObserver(EntityProxyObserver observer) { + observerReference = new WeakReference>(observer); + } + + public void setSubject(EntityProxy subject) { + this.subject = subject; + } + + public EntityProxyObserver getObserver() { + return observerReference.get(); + } +} + diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java new file mode 100644 index 0000000..9687f22 --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java @@ -0,0 +1,185 @@ +package com.c2kernel.entity.proxy; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.InterruptedIOException; +import java.io.PrintWriter; +import java.net.Socket; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.Iterator; + +import com.c2kernel.common.InvalidDataException; +import com.c2kernel.utils.Logger; +import com.c2kernel.utils.server.SocketHandler; + +/************************************************************************** + * + * $Revision: 1.18 $ + * $Date: 2005/05/10 11:40:09 $ + * + * Copyright (C) 2003 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +public class ProxyClientConnection implements SocketHandler { + + Socket clientSocket = null; + static int clientId = -1; + int thisClientId; + ArrayList sysKeys; + PrintWriter response; + BufferedReader request; + boolean closing = false; + + public ProxyClientConnection() { + super(); + thisClientId = ++clientId; + EntityProxyManager.registerProxyClient(this); + Logger.msg(1, "Proxy Client Connection Handler "+thisClientId+" ready."); + } + + + @Override + public String getName() { + return "Proxy Client Connection"; + } + + @Override + public boolean isBusy() { + return clientSocket != null; + } + + @Override + public synchronized void setSocket(Socket newSocket) { + try { + Logger.msg(1, "Proxy Client Connection "+thisClientId+" connect from "+newSocket.getInetAddress()+":"+newSocket.getPort()); + newSocket.setSoTimeout(500); + clientSocket = newSocket; + response = new PrintWriter(clientSocket.getOutputStream(), true); + sysKeys = new ArrayList(); + } catch (SocketException ex) { + Logger.msg("Could not set socket timeout:"); + Logger.error(ex); + closeSocket(); + } catch (IOException ex) { + Logger.msg("Could not setup output stream:"); + Logger.error(ex); + closeSocket(); + } + } + + /** + * Main loop. Reads proxy commands from the client and acts on them. + */ + @Override + public void run() { + Thread.currentThread().setName("Proxy Client Connection: "+clientSocket.getInetAddress()); + Logger.msg(7, "ProxyClientConnection "+thisClientId+" - Setting up proxy client connection with "+clientSocket.getInetAddress()); + try { + request = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); + String input = null; + ProxyMessage thisMessage; + while (clientSocket != null) { + try { + input = request.readLine(); + Logger.msg(9, "ProxyClientConnection "+thisClientId+" - received "+input); + thisMessage = new ProxyMessage(input); + processMessage(thisMessage); + } catch (InterruptedIOException ex) { //timeout + } catch (InvalidDataException ex) { // invalid proxy message + Logger.error("ProxyClientConnection "+thisClientId+" - Invalid proxy message: "+input); + } + + } + } catch (IOException ex) { + if (!closing) + Logger.error("ProxyClientConnection "+thisClientId+" - Error reading from socket."); + } + closeSocket(); + Logger.msg(1, "ProxyClientConnection "+thisClientId+" closed."); + } + + private void processMessage(ProxyMessage message) throws InvalidDataException { + + // proxy disconnection + if (message.getPath().equals(ProxyMessage.BYEPATH)) { + Logger.msg(7, "ProxyClientConnection "+thisClientId+" disconnecting"); + closeSocket(); + } + + // proxy checking connection + else if (message.getPath().equals(ProxyMessage.PINGPATH)) + response.println(ProxyMessage.pingMessage); + + // new subscription to entity changes + else if (message.getPath().equals(ProxyMessage.ADDPATH)) { + Logger.msg(7, "ProxyClientConnection "+thisClientId+" subscribed to "+message.getSysKey()); + synchronized (sysKeys) { + sysKeys.add(new Integer(message.getSysKey())); + } + } + + // remove of subscription to entity changes + else if (message.getPath().equals(ProxyMessage.DELPATH)) { + synchronized (sysKeys) { + sysKeys.remove(new Integer(message.getSysKey())); + } + Logger.msg(7, "ProxyClientConnection "+thisClientId+" unsubscribed from "+message.getSysKey()); + } + + else // unknown message + Logger.error("ProxyClientConnection "+thisClientId+" - Unknown message type: "+message); + + } + + public synchronized void sendMessage(ProxyMessage message) { + if (clientSocket==null) return; // idle + boolean relevant = message.getSysKey() == ProxyMessage.NA; + synchronized (sysKeys) { + for (Iterator iter = sysKeys.iterator(); iter.hasNext() && !relevant;) { + Integer thisKey = iter.next(); + if (thisKey.intValue() == message.getSysKey()) + relevant = true; + } + } + if (!relevant) return; // not for our client + + response.println(message); + } + + @Override + public void shutdown() { + if (isBusy()) { + closing = true; + Logger.msg("ProxyClientConnection "+thisClientId+" closing."); + closeSocket(); + } + } + + @Override + public String toString() { + if (clientSocket == null) return thisClientId+": idle"; + else return thisClientId+": "+clientSocket.getInetAddress(); + } + + private synchronized void closeSocket() { + if (clientSocket==null) return; + try { + request.close(); + response.close(); + clientSocket.close(); + } catch (IOException e) { + Logger.error("ProxyClientConnection "+thisClientId+" - Could not close socket."); + Logger.error(e); + } + synchronized (sysKeys) { + sysKeys = null; + } + + clientSocket = null; + + } + +} diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyMessage.java b/src/main/java/com/c2kernel/entity/proxy/ProxyMessage.java new file mode 100644 index 0000000..62866eb --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyMessage.java @@ -0,0 +1,102 @@ +package com.c2kernel.entity.proxy; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.util.StringTokenizer; + +import com.c2kernel.common.InvalidDataException; + + +/************************************************************************** + * + * $Revision: 1.11 $ + * $Date: 2005/05/10 11:40:09 $ + * + * Copyright (C) 2003 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +public class ProxyMessage { + + // special server message paths + public static final String BYEPATH = "bye"; + public static final String ADDPATH = "add"; + public static final String DELPATH = "del"; + public static final String PINGPATH = "ping"; + public static final boolean ADDED = false; + public static final boolean DELETED = true; + public static final int NA = -1; + + static ProxyMessage byeMessage = new ProxyMessage(NA, BYEPATH, ADDED); + static ProxyMessage pingMessage = new ProxyMessage(NA, PINGPATH, ADDED); + + private int sysKey = NA; + private String path = ""; + private String server = null; + private boolean state = ADDED; + + public ProxyMessage() { + super(); + } + public ProxyMessage(int sysKey, String path, boolean state) { + this(); + setSysKey(sysKey); + setPath(path); + setState(state); + } + + public ProxyMessage(String line) throws InvalidDataException, IOException { + if (line == null) + throw new IOException("Null proxy message"); + StringTokenizer tok = new StringTokenizer(line,":"); + if (tok.countTokens()!=2) + throw new InvalidDataException("String '"+line+"' does not constitute a valid proxy message.", ""); + sysKey = Integer.parseInt(tok.nextToken()); + path = tok.nextToken(); + if (path.startsWith("-")) { + state = DELETED; + path = path.substring(1); + } + } + + public ProxyMessage(DatagramPacket packet) throws InvalidDataException, IOException { + this(new String(packet.getData())); + } + + public int getSysKey() { + return sysKey; + } + + public void setSysKey(int sysKey) { + this.sysKey = sysKey; + } + + public String getPath() { + return path; + } + + public void setPath(String newPath) { + this.path = newPath; + } + + public boolean getState() { + return state; + } + + public void setState(boolean state) { + this.state = state; + } + + @Override + public String toString() { + return sysKey+":"+(state?"-":"")+path; + } + + public String getServer() { + return server; + } + + public void setServer(String server) { + this.server = server; + } +} diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyServerConnection.java b/src/main/java/com/c2kernel/entity/proxy/ProxyServerConnection.java new file mode 100644 index 0000000..6807953 --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyServerConnection.java @@ -0,0 +1,134 @@ +/************************************************************************** + * EntityProxyFactory.java + * + * $Revision: 1.3 $ + * $Date: 2005/05/25 12:11:44 $ + * + * Copyright (C) 2001 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +package com.c2kernel.entity.proxy; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.InterruptedIOException; +import java.io.PrintWriter; +import java.net.Socket; + +import com.c2kernel.common.InvalidDataException; +import com.c2kernel.utils.Logger; + + +public class ProxyServerConnection extends Thread +{ + + public boolean serverIsActive = true; + // proxy client details + String serverName; + int serverPort; + Socket serverConnection; + EntityProxyManager manager; + // for talking to the proxy server + PrintWriter serverStream; + boolean listening = false; + static boolean isServer = false; + + /** + * Create an entity proxy manager to listen for proxy events and reap unused proxies + */ + public ProxyServerConnection(String host, int port, EntityProxyManager manager) + { + Logger.msg(5, "ProxyServerConnection - Initialising connection to "+host+":"+port); + serverName = host; + serverPort = port; + this.manager = manager; + listening = true; + start(); + } + + @Override + public void run() { + Thread.currentThread().setName("Proxy Client Connection Listener to "+serverName+":"+serverPort); + while (listening) { + try { + if (serverConnection == null) connect(); + if (serverConnection != null) { + BufferedReader request = new BufferedReader(new InputStreamReader(serverConnection.getInputStream())); + String input = null; + ProxyMessage thisMessage; + while (listening && serverConnection != null) { + try { + input = request.readLine(); + thisMessage = new ProxyMessage(input); + thisMessage.setServer(serverName); + manager.processMessage(thisMessage); + } catch (InterruptedIOException ex) { // timeout - send a ping + sendMessage(ProxyMessage.pingMessage); + } catch (InvalidDataException ex) { // invalid proxy message + if (input != null) + Logger.error("EntityProxyManager - Invalid proxy message: "+input); + } + } + } + } catch (IOException ex) { + Logger.error("ProxyServerConnection - Disconnected from "+serverName+":"+serverPort); + try { + serverStream.close(); + serverConnection.close(); + } catch (IOException e1) { } + + + serverStream = null; + serverConnection = null; + } + } + + if (serverStream != null) { + try { + Logger.msg(1, "Disconnecting from proxy server on "+serverName+":"+serverPort); + serverStream.println(ProxyMessage.byeMessage.toString()); + serverStream.close(); + serverConnection.close(); + serverConnection = null; + } catch (Exception e) { + Logger.error("Error disconnecting from proxy server."); + } + } + } + + public void connect() { + Logger.msg(3, "ProxyServerConnection - connecting to proxy server on "+serverName+":"+serverPort); + try { + serverConnection = new Socket(serverName, serverPort); + serverConnection.setKeepAlive(true); + serverIsActive = true; + serverConnection.setSoTimeout(5000); + serverStream = new PrintWriter(serverConnection.getOutputStream(), true); + Logger.msg("Connected to proxy server on "+serverName+":"+serverPort); + manager.resubscribe(this); + } catch (Exception e) { + Logger.msg(3, "Could not connect to proxy server. Retrying in 5s"); + try { Thread.sleep(5000); } catch (InterruptedException ex) { } + serverStream = null; + serverConnection = null; + serverIsActive = false; + } + } + + public void shutdown() { + Logger.msg("Proxy Client: flagging shutdown."); + listening = false; + } + + /** + * @param sub + */ + public void sendMessage(ProxyMessage sub) { + if (serverStream != null) + serverStream.println(sub); + } + +} + -- cgit v1.2.3