From 0ed2c1124cf1b9e49a2ec1fa0126a8df09f9e758 Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Tue, 7 Oct 2014 09:18:11 +0200 Subject: Repackage to org.cristalise --- .../cristalise/kernel/entity/proxy/AgentProxy.java | 323 +++++++++++++++++ .../kernel/entity/proxy/DomainPathSubscriber.java | 38 ++ .../cristalise/kernel/entity/proxy/ItemProxy.java | 390 +++++++++++++++++++++ .../kernel/entity/proxy/MemberSubscription.java | 141 ++++++++ .../kernel/entity/proxy/ProxyClientConnection.java | 208 +++++++++++ .../kernel/entity/proxy/ProxyManager.java | 277 +++++++++++++++ .../kernel/entity/proxy/ProxyMessage.java | 129 +++++++ .../kernel/entity/proxy/ProxyObserver.java | 47 +++ .../kernel/entity/proxy/ProxyServer.java | 129 +++++++ .../kernel/entity/proxy/ProxyServerConnection.java | 145 ++++++++ .../kernel/entity/proxy/package-info.java | 76 ++++ 11 files changed, 1903 insertions(+) create mode 100644 src/main/java/org/cristalise/kernel/entity/proxy/AgentProxy.java create mode 100644 src/main/java/org/cristalise/kernel/entity/proxy/DomainPathSubscriber.java create mode 100644 src/main/java/org/cristalise/kernel/entity/proxy/ItemProxy.java create mode 100644 src/main/java/org/cristalise/kernel/entity/proxy/MemberSubscription.java create mode 100644 src/main/java/org/cristalise/kernel/entity/proxy/ProxyClientConnection.java create mode 100644 src/main/java/org/cristalise/kernel/entity/proxy/ProxyManager.java create mode 100644 src/main/java/org/cristalise/kernel/entity/proxy/ProxyMessage.java create mode 100644 src/main/java/org/cristalise/kernel/entity/proxy/ProxyObserver.java create mode 100644 src/main/java/org/cristalise/kernel/entity/proxy/ProxyServer.java create mode 100644 src/main/java/org/cristalise/kernel/entity/proxy/ProxyServerConnection.java create mode 100644 src/main/java/org/cristalise/kernel/entity/proxy/package-info.java (limited to 'src/main/java/org/cristalise/kernel/entity/proxy') diff --git a/src/main/java/org/cristalise/kernel/entity/proxy/AgentProxy.java b/src/main/java/org/cristalise/kernel/entity/proxy/AgentProxy.java new file mode 100644 index 0000000..4b05764 --- /dev/null +++ b/src/main/java/org/cristalise/kernel/entity/proxy/AgentProxy.java @@ -0,0 +1,323 @@ +/** + * This file is part of the CRISTAL-iSE kernel. + * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved. + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; either version 3 of the License, or (at + * your option) any later version. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * + * http://www.fsf.org/licensing/licenses/lgpl.html + */ +package org.cristalise.kernel.entity.proxy; + +import java.util.Date; +import java.util.Iterator; + +import org.cristalise.kernel.common.AccessRightsException; +import org.cristalise.kernel.common.InvalidCollectionModification; +import org.cristalise.kernel.common.InvalidDataException; +import org.cristalise.kernel.common.InvalidTransitionException; +import org.cristalise.kernel.common.ObjectAlreadyExistsException; +import org.cristalise.kernel.common.ObjectNotFoundException; +import org.cristalise.kernel.common.PersistencyException; +import org.cristalise.kernel.entity.Agent; +import org.cristalise.kernel.entity.AgentHelper; +import org.cristalise.kernel.entity.C2KLocalObject; +import org.cristalise.kernel.entity.agent.Job; +import org.cristalise.kernel.lifecycle.instance.predefined.PredefinedStep; +import org.cristalise.kernel.lookup.AgentPath; +import org.cristalise.kernel.lookup.DomainPath; +import org.cristalise.kernel.lookup.InvalidItemPathException; +import org.cristalise.kernel.lookup.ItemPath; +import org.cristalise.kernel.lookup.Path; +import org.cristalise.kernel.persistency.outcome.OutcomeValidator; +import org.cristalise.kernel.persistency.outcome.Schema; +import org.cristalise.kernel.process.Gateway; +import org.cristalise.kernel.process.auth.Authenticator; +import org.cristalise.kernel.scripting.ErrorInfo; +import org.cristalise.kernel.scripting.Script; +import org.cristalise.kernel.scripting.ScriptErrorException; +import org.cristalise.kernel.scripting.ScriptingEngineException; +import org.cristalise.kernel.utils.LocalObjectLoader; +import org.cristalise.kernel.utils.Logger; + + +/****************************************************************************** + * It is a wrapper for the connection and communication with Agent + * It caches data loaded from the Agent to reduce communication + * + * @version $Revision: 1.37 $ $Date: 2005/10/05 07:39:36 $ + * @author $Author: abranson $ + ******************************************************************************/ +public class AgentProxy extends ItemProxy +{ + + AgentPath mAgentPath; + String mAgentName; + Authenticator auth; + /************************************************************************** + * Creates an AgentProxy without cache and change notification + **************************************************************************/ + protected AgentProxy( org.omg.CORBA.Object ior, + AgentPath agentPath) + throws ObjectNotFoundException + { + super(ior, agentPath); + mAgentPath = agentPath; + } + + public Authenticator getAuthObj() { + return auth; + } + + public void setAuthObj(Authenticator auth) { + this.auth = auth; + } + + @Override + public Agent narrow() throws ObjectNotFoundException + { + try { + return AgentHelper.narrow(mIOR); + } catch (org.omg.CORBA.BAD_PARAM ex) { } + throw new ObjectNotFoundException("CORBA Object was not an Agent, or the server is down."); + } + + /** + * Standard execution of jobs. Note that this method should always be the one used from clients - all execution + * parameters are taken from the job where they're probably going to be correct. + * + * @param job + * @throws AccessRightsException + * @throws InvalidDataException + * @throws InvalidTransitionException + * @throws ObjectNotFoundException + * @throws PersistencyException + * @throws ObjectAlreadyExistsException + * @throws ScriptErrorException + * @throws InvalidCollectionModification + */ + public String execute(Job job) + throws AccessRightsException, + InvalidDataException, + InvalidTransitionException, + ObjectNotFoundException, + PersistencyException, + ObjectAlreadyExistsException, + ScriptErrorException, InvalidCollectionModification + { + ItemProxy item = Gateway.getProxyManager().getProxy(job.getItemPath()); + OutcomeValidator validator = null; + Date startTime = new Date(); + Logger.msg(3, "AgentProxy - executing "+job.getStepPath()+" for "+mAgentPath.getAgentName()); + // get the outcome validator if present + if (job.hasOutcome()) + { + String schemaName = job.getSchemaName(); + int schemaVersion = job.getSchemaVersion(); + + Logger.msg(5, "AgentProxy - fetching schema "+schemaName+"_"+schemaVersion+" for validation"); + // retrieve schema + Schema schema = LocalObjectLoader.getSchema(schemaName, schemaVersion); + + if (schema == null) + throw new InvalidDataException("Job references outcome type "+schemaName+" version "+schemaVersion+" that does not exist in this centre."); + + try { + validator = OutcomeValidator.getValidator(schema); + } catch (Exception e) { + throw new InvalidDataException("Could not create validator: "+e.getMessage()); + } + } + + if(job.hasScript()) { + Logger.msg(3, "AgentProxy - executing script "+job.getScriptName()+" v"+job.getScriptVersion()); + try { + + // pre-validate outcome from script if there is one + if (job.getOutcomeString()!= null && validator != null) { + Logger.msg(5, "AgentProxy - validating outcome before script execution"); + String error = validator.validate(job.getOutcomeString()); + if (error.length() > 0) { + Logger.error("Outcome not valid: \n " + error); + throw new InvalidDataException(error); + } + } + + // load script + ErrorInfo scriptErrors = (ErrorInfo)callScript(item, job); + String errorString = scriptErrors.toString(); + if (scriptErrors.getFatal()) { + Logger.msg(3, "AgentProxy - fatal script error"); + throw new ScriptErrorException(scriptErrors); + } + if (errorString.length() > 0) + Logger.warning("Script errors: "+errorString); + } catch (ScriptingEngineException ex) { + Logger.error(ex); + throw new InvalidDataException(ex.getMessage()); + } + } + + if (job.isOutcomeSet()) { + Logger.msg(3, "AgentProxy - validating outcome"); + String error = validator.validate(job.getOutcomeString()); + if (error.length() > 0) + throw new InvalidDataException(error); + } + + job.setAgentPath(mAgentPath); + Logger.msg(3, "AgentProxy - submitting job to item proxy"); + String result = item.requestAction(job); + if (Logger.doLog(3)) { + Date timeNow = new Date(); + long secsNow = (timeNow.getTime()-startTime.getTime())/1000; + Logger.msg(3, "Execution took "+secsNow+" seconds"); + } + + return result; + } + + private Object callScript(ItemProxy item, Job job) throws ScriptingEngineException { + Script script = new Script(item, this, job); + return script.execute(); + } + + public String execute(ItemProxy item, String predefStep, C2KLocalObject obj) + throws AccessRightsException, + InvalidDataException, + InvalidTransitionException, + ObjectNotFoundException, + PersistencyException, + ObjectAlreadyExistsException, InvalidCollectionModification + { + String param; + try { + param = marshall(obj); + } catch (Exception ex) { + Logger.error(ex); + throw new InvalidDataException("Error on marshall"); + } + return execute(item, predefStep, param); + } + + /** + * Multi-parameter execution. Wraps parameters up in a PredefinedStepOutcome + * if the schema of the requested step is such. + * + * @param item The item on which to execute the step + * @param predefStep The step name to run + * @param params An array of parameters to pass to the step. See each step's + * documentation for its required parameters + * + * @return The outcome after processing. May have been altered by the step. + * + * @throws AccessRightsException The agent was not allowed to execute this step + * @throws InvalidDataException The parameters supplied were incorrect + * @throws InvalidTransitionException The step wasn't available + * @throws ObjectNotFoundException Thrown by some steps that try to locate additional objects + * @throws PersistencyException Problem writing or reading the database + * @throws ObjectAlreadyExistsException Thrown by steps that create additional object + * @throws InvalidCollectionModification + */ + public String execute(ItemProxy item, String predefStep, String[] params) + throws AccessRightsException, + InvalidDataException, + InvalidTransitionException, + ObjectNotFoundException, + PersistencyException, + ObjectAlreadyExistsException, InvalidCollectionModification + { + String schemaName = PredefinedStep.getPredefStepSchemaName(predefStep); + String param; + if (schemaName.equals("PredefinedStepOutcome")) + param = PredefinedStep.bundleData(params); + else + param = params[0]; + + return item.getItem().requestAction(mAgentPath.getSystemKey(), "workflow/predefined/"+predefStep, PredefinedStep.DONE, param); + } + + /** + * Single parameter execution + * + * @see #execute(ItemProxy, String, String[]) + * + * @param item + * @param predefStep + * @param param + * @return + * @throws AccessRightsException + * @throws InvalidDataException + * @throws InvalidTransitionException + * @throws ObjectNotFoundException + * @throws PersistencyException + * @throws ObjectAlreadyExistsException + * @throws InvalidCollectionModification + */ + + public String execute(ItemProxy item, String predefStep, String param) + throws AccessRightsException, + InvalidDataException, + InvalidTransitionException, + ObjectNotFoundException, + PersistencyException, + ObjectAlreadyExistsException, InvalidCollectionModification + { + return execute(item, predefStep, new String[] {param }); + } + + /** Wrappers for scripts */ + public String marshall(Object obj) throws Exception { + return Gateway.getMarshaller().marshall(obj); + } + + public Object unmarshall(String obj) throws Exception { + return Gateway.getMarshaller().unmarshall(obj); + } + + /** Let scripts resolve items */ + public ItemProxy searchItem(String name) throws ObjectNotFoundException { + Iterator results = Gateway.getLookup().search(new DomainPath(""),name); + + Path returnPath = null; + if (!results.hasNext()) + throw new ObjectNotFoundException(name); + + while(results.hasNext()) { + Path nextMatch = results.next(); + if (returnPath != null && nextMatch.getUUID() != null && !returnPath.getUUID().equals(nextMatch.getUUID())) + throw new ObjectNotFoundException("Too many items with that name"); + returnPath = nextMatch; + } + + return Gateway.getProxyManager().getProxy(returnPath); + } + + public ItemProxy getItem(String itemPath) throws ObjectNotFoundException { + return (getItem(new DomainPath(itemPath))); + } + + @Override + public AgentPath getPath() { + return mAgentPath; + } + + public ItemProxy getItem(Path itemPath) throws ObjectNotFoundException { + return Gateway.getProxyManager().getProxy(itemPath); + } + + public ItemProxy getItemByUUID(String uuid) throws ObjectNotFoundException, InvalidItemPathException { + return Gateway.getProxyManager().getProxy(new ItemPath(uuid)); + } +} diff --git a/src/main/java/org/cristalise/kernel/entity/proxy/DomainPathSubscriber.java b/src/main/java/org/cristalise/kernel/entity/proxy/DomainPathSubscriber.java new file mode 100644 index 0000000..bce1fa7 --- /dev/null +++ b/src/main/java/org/cristalise/kernel/entity/proxy/DomainPathSubscriber.java @@ -0,0 +1,38 @@ +/** + * This file is part of the CRISTAL-iSE kernel. + * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved. + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; either version 3 of the License, or (at + * your option) any later version. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * + * http://www.fsf.org/licensing/licenses/lgpl.html + */ +package org.cristalise.kernel.entity.proxy; + +import org.cristalise.kernel.lookup.DomainPath; + +/************************************************************************** + * + * $Revision: 1.1 $ + * $Date: 2004/02/05 16:11:57 $ + * + * Copyright (C) 2003 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +public interface DomainPathSubscriber { + + public void pathAdded(DomainPath path); + public void pathRemoved(DomainPath path); +} diff --git a/src/main/java/org/cristalise/kernel/entity/proxy/ItemProxy.java b/src/main/java/org/cristalise/kernel/entity/proxy/ItemProxy.java new file mode 100644 index 0000000..326da36 --- /dev/null +++ b/src/main/java/org/cristalise/kernel/entity/proxy/ItemProxy.java @@ -0,0 +1,390 @@ +/** + * This file is part of the CRISTAL-iSE kernel. + * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved. + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; either version 3 of the License, or (at + * your option) any later version. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * + * http://www.fsf.org/licensing/licenses/lgpl.html + */ +package org.cristalise.kernel.entity.proxy; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; + +import org.cristalise.kernel.collection.Collection; +import org.cristalise.kernel.collection.CollectionArrayList; +import org.cristalise.kernel.common.AccessRightsException; +import org.cristalise.kernel.common.InvalidCollectionModification; +import org.cristalise.kernel.common.InvalidDataException; +import org.cristalise.kernel.common.InvalidTransitionException; +import org.cristalise.kernel.common.ObjectAlreadyExistsException; +import org.cristalise.kernel.common.ObjectNotFoundException; +import org.cristalise.kernel.common.PersistencyException; +import org.cristalise.kernel.entity.C2KLocalObject; +import org.cristalise.kernel.entity.Item; +import org.cristalise.kernel.entity.ItemHelper; +import org.cristalise.kernel.entity.agent.Job; +import org.cristalise.kernel.entity.agent.JobArrayList; +import org.cristalise.kernel.lifecycle.instance.CompositeActivity; +import org.cristalise.kernel.lifecycle.instance.Workflow; +import org.cristalise.kernel.lookup.AgentPath; +import org.cristalise.kernel.lookup.ItemPath; +import org.cristalise.kernel.persistency.ClusterStorage; +import org.cristalise.kernel.persistency.outcome.Viewpoint; +import org.cristalise.kernel.process.Gateway; +import org.cristalise.kernel.property.Property; +import org.cristalise.kernel.property.PropertyArrayList; +import org.cristalise.kernel.utils.CastorXMLUtility; +import org.cristalise.kernel.utils.Logger; +import org.exolab.castor.mapping.MappingException; +import org.exolab.castor.xml.MarshalException; +import org.exolab.castor.xml.ValidationException; + + +/****************************************************************************** + * It is a wrapper for the connection and communication with Item + * It caches data loaded from the Item to reduce communication + * + * @version $Revision: 1.25 $ $Date: 2005/05/10 11:40:09 $ + * @author $Author: abranson $ + ******************************************************************************/ +public class ItemProxy +{ + + protected Item mItem = null; + protected ItemPath mItemPath; + protected org.omg.CORBA.Object mIOR; + private final HashMap, ProxyObserver> + mSubscriptions; + + /************************************************************************** + * + **************************************************************************/ + protected ItemProxy( org.omg.CORBA.Object ior, + ItemPath itemPath) + { + Logger.msg(8, "ItemProxy::initialise() - Initialising item proxy " +itemPath); + + mIOR = ior; + mItemPath = itemPath; + mSubscriptions = new HashMap, ProxyObserver>(); + + } + + public ItemPath getPath() { + return mItemPath; + } + + protected Item getItem() throws ObjectNotFoundException { + if (mItem == null) + mItem = narrow(); + return mItem; + } + + public Item narrow() throws ObjectNotFoundException + { + try { + return ItemHelper.narrow(mIOR); + } catch (org.omg.CORBA.BAD_PARAM ex) { } + throw new ObjectNotFoundException("CORBA Object was not an Item, or the server is down."); + } + + public void initialise( AgentPath agentId, + PropertyArrayList itemProps, + CompositeActivity workflow, + CollectionArrayList colls + ) + throws AccessRightsException, InvalidDataException, PersistencyException, ObjectNotFoundException, MarshalException, ValidationException, IOException, MappingException, InvalidCollectionModification + { + Logger.msg(7, "ItemProxy::initialise - started"); + CastorXMLUtility xml = Gateway.getMarshaller(); + if (itemProps == null) throw new InvalidDataException("No initial properties supplied"); + String propString = xml.marshall(itemProps); + String wfString = ""; + if (workflow != null) wfString = xml.marshall(workflow); + String collString = ""; + if (colls != null) collString = xml.marshall(colls); + + getItem().initialise( agentId.getSystemKey(), propString, wfString, collString); + } + + public void setProperty(AgentProxy agent, String name, String value) + throws AccessRightsException, + PersistencyException, InvalidDataException + { + String[] params = new String[2]; + params[0] = name; + params[1] = value; + try { + agent.execute(this, "WriteProperty", params); + } catch (AccessRightsException e) { + throw (e); + } catch (PersistencyException e) { + throw (e); + } catch (InvalidDataException e) { + throw (e); + } catch (Exception e) { + Logger.error(e); + throw new PersistencyException("Could not store property"); + } + } + + /** + * @throws InvalidCollectionModification + * + **************************************************************************/ + public String requestAction( Job thisJob ) + throws AccessRightsException, + InvalidTransitionException, + ObjectNotFoundException, + InvalidDataException, + PersistencyException, + ObjectAlreadyExistsException, + InvalidCollectionModification + { + String outcome = thisJob.getOutcomeString(); + // check fields that should have been filled in + if (outcome==null) + if (thisJob.isOutcomeRequired()) + throw new InvalidDataException("Outcome is required."); + else + outcome=""; + + if (thisJob.getAgentPath() == null) + throw new InvalidDataException("No Agent specified."); + + Logger.msg(7, "ItemProxy - executing "+thisJob.getStepPath()+" for "+thisJob.getAgentName()); + return getItem().requestAction (thisJob.getAgentPath().getSystemKey(), thisJob.getStepPath(), + thisJob.getTransition().getId(), outcome); + } + + /************************************************************************** + * + **************************************************************************/ + private ArrayList getJobList(AgentPath agentPath, boolean filter) + throws AccessRightsException, + ObjectNotFoundException, + PersistencyException + { + JobArrayList thisJobList; + try { + String jobs = getItem().queryLifeCycle(agentPath.getSystemKey(), filter); + thisJobList = (JobArrayList)Gateway.getMarshaller().unmarshall(jobs); + } + catch (Exception e) { + Logger.error(e); + throw new PersistencyException("Exception::ItemProxy::getJobList() - Cannot unmarshall the jobs"); + } + return thisJobList.list; + } + + public ArrayList getJobList(AgentProxy agent) + throws AccessRightsException, + ObjectNotFoundException, + PersistencyException + { + return getJobList(agent.getPath(), true); + } + + private Job getJobByName(String actName, AgentPath agent) + throws AccessRightsException, + ObjectNotFoundException, + PersistencyException { + + ArrayList jobList = getJobList(agent, true); + for (Job job : jobList) { + if (job.getStepName().equals(actName) && job.hasOutcome()) + return job; + } + return null; + + } + + public Collection getCollection(String collName) throws ObjectNotFoundException { + return (Collection)getObject(ClusterStorage.COLLECTION+"/"+collName+"/last"); + } + + public Workflow getWorkflow() throws ObjectNotFoundException { + return (Workflow)getObject(ClusterStorage.LIFECYCLE+"/workflow"); + } + + public Viewpoint getViewpoint(String schemaName, String viewName) throws ObjectNotFoundException { + return (Viewpoint)getObject(ClusterStorage.VIEWPOINT+"/"+schemaName+"/"+viewName); + } + + public Job getJobByName(String actName, AgentProxy agent) + throws AccessRightsException, + ObjectNotFoundException, + PersistencyException { + return getJobByName(actName, agent.getPath()); + } + + /** + * If this is reaped, clear out the cache for it too. + */ + @Override + protected void finalize() throws Throwable { + Logger.msg(7, "Proxy "+mItemPath+" reaped"); + Gateway.getStorage().clearCache(mItemPath, null); + Gateway.getProxyManager().removeProxy(mItemPath); + super.finalize(); + } + + /************************************************************************** + * + **************************************************************************/ + public String queryData( String path ) + throws ObjectNotFoundException + { + + try { + Logger.msg(7, "EntityProxy.queryData() - "+mItemPath+"/"+path); + if (path.endsWith("all")) { + Logger.msg(7, "EntityProxy.queryData() - listing contents"); + String[] result = Gateway.getStorage().getClusterContents(mItemPath, path.substring(0, path.length()-3)); + StringBuffer retString = new StringBuffer(); + for (int i = 0; i < result.length; i++) { + retString.append(result[i]); + if (i"+e.getMessage()+""; + } + } + + public String[] getContents( String path ) throws ObjectNotFoundException { + try { + return Gateway.getStorage().getClusterContents(mItemPath, path.substring(0, path.length())); + } catch (PersistencyException e) { + throw new ObjectNotFoundException(e.toString()); + } + } + + + /************************************************************************** + * + **************************************************************************/ + public C2KLocalObject getObject( String xpath ) + throws ObjectNotFoundException + { + // load from storage, falling back to proxy loader if not found in others + try + { + return Gateway.getStorage().get( mItemPath, xpath , null); + } + catch( PersistencyException ex ) + { + Logger.msg(4, "Exception loading object :"+mItemPath+"/"+xpath); + throw new ObjectNotFoundException( ex.toString() ); + } + } + + + + public String getProperty( String name ) + throws ObjectNotFoundException + { + Logger.msg(5, "Get property "+name+" from item "+mItemPath); + Property prop = (Property)getObject("Property/"+name); + try + { + return prop.getValue(); + } + catch (NullPointerException ex) + { + throw new ObjectNotFoundException(); + } + } + + public String getName() + { + try { + return getProperty("Name"); + } catch (ObjectNotFoundException ex) { + return null; + } + } + + + + + /************************************************************************** + * Subscription methods + **************************************************************************/ + + public void subscribe(MemberSubscription newSub) { + + newSub.setSubject(this); + synchronized (this){ + mSubscriptions.put( newSub, newSub.getObserver() ); + } + new Thread(newSub).start(); + Logger.msg(7, "Subscribed "+newSub.getObserver().getClass().getName()+" for "+newSub.interest); + } + + public void unsubscribe(ProxyObserver observer) + { + synchronized (this){ + for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { + MemberSubscription thisSub = e.next(); + if (mSubscriptions.get( thisSub ) == observer) { + e.remove(); + Logger.msg(7, "Unsubscribed "+observer.getClass().getName()); + } + } + } + } + + public void dumpSubscriptions(int logLevel) { + if (mSubscriptions.size() == 0) return; + Logger.msg(logLevel, "Subscriptions to proxy "+mItemPath+":"); + synchronized(this) { + for (MemberSubscription element : mSubscriptions.keySet()) { + ProxyObserver obs = element.getObserver(); + if (obs != null) + Logger.msg(logLevel, " "+element.getObserver().getClass().getName()+" subscribed to "+element.interest); + else + Logger.msg(logLevel, " Phantom subscription to "+element.interest); + } + } + } + + public void notify(ProxyMessage message) { + Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mItemPath); + synchronized (this){ + if (Gateway.getProxyServer()== null || !message.getServer().equals(Gateway.getProxyServer().getServerName())) + Gateway.getStorage().clearCache(mItemPath, message.getPath()); + for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { + MemberSubscription newSub = e.next(); + if (newSub.getObserver() == null) { // phantom + Logger.msg(4, "Removing phantom subscription to "+newSub.interest); + e.remove(); + } + else + newSub.update(message.getPath(), message.getState()); + } + } + } +} diff --git a/src/main/java/org/cristalise/kernel/entity/proxy/MemberSubscription.java b/src/main/java/org/cristalise/kernel/entity/proxy/MemberSubscription.java new file mode 100644 index 0000000..2873eb8 --- /dev/null +++ b/src/main/java/org/cristalise/kernel/entity/proxy/MemberSubscription.java @@ -0,0 +1,141 @@ +/** + * This file is part of the CRISTAL-iSE kernel. + * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved. + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; either version 3 of the License, or (at + * your option) any later version. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * + * http://www.fsf.org/licensing/licenses/lgpl.html + */ +package org.cristalise.kernel.entity.proxy; +import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.StringTokenizer; + +import org.cristalise.kernel.common.ObjectNotFoundException; +import org.cristalise.kernel.entity.C2KLocalObject; +import org.cristalise.kernel.utils.Logger; + + +public class MemberSubscription implements Runnable { + public static final String ERROR = "Error"; + public static final String END = "theEND"; + + ItemProxy subject; + String interest; + // keep the subscriber by weak reference, so it is not kept from the garbage collector if no longer used + WeakReference> observerReference; + ArrayList contents = new ArrayList(); + boolean preLoad; + + public MemberSubscription(ProxyObserver observer, String interest, boolean preLoad) { + setObserver(observer); + this.interest = interest; + this.preLoad = preLoad; + } + + @Override + public void run() { + Thread.currentThread().setName("Member Subscription: "+subject.getPath()+":"+interest); + if (preLoad) loadChildren(); + } + + private void loadChildren() { + C newMember; + ProxyObserver observer = getObserver(); + if (observer == null) return; //reaped + try { + // fetch contents of path + String children = subject.queryData(interest+"/all"); + StringTokenizer tok = new StringTokenizer(children, ","); + ArrayList newContents = new ArrayList(); + while (tok.hasMoreTokens()) + newContents.add(tok.nextToken()); + + // look to see what's new + for (String newChild: newContents) { + + // load child object + try { + newMember = (C)subject.getObject(interest+"/"+newChild); + contents.remove(newChild); + observer.add(newMember); + } catch (ObjectNotFoundException ex) { + observer.control(ERROR, "Listed member "+newChild+" was not found."); + } catch (ClassCastException ex) { + Logger.error(ex); + observer.control(ERROR, "Listed member "+newChild+" was the wrong type."); + } + } + // report what's left in old contents as deleted + for (String oldChild: contents) { + observer.remove(interest+"/"+oldChild); + } + //replace contents arraylist + contents = newContents; + //report that we're done + observer.control(END, null); + } catch (Exception ex) { + observer.control(ERROR, "Query on "+interest+" failed with "+ex.getMessage()); + } + } + + public boolean isRelevant(String path) { + Logger.msg(7, "Checking relevance of "+path+" to "+interest); + return (path.startsWith(interest)); + } + + public void update(String path, boolean deleted) { + ProxyObserver observer = getObserver(); + if (observer == null) return; //reaped + Logger.msg(7, "Processing proxy message path "+path +" for "+observer+". Interest: "+interest+" Was Deleted:"+deleted); + if (!path.startsWith(interest)) // doesn't concern us + return; + + if (path.equals(interest)) // refresh contents + loadChildren(); + else { + String name = path.substring(interest.length()); + if (deleted) { + Logger.msg(4, "Removing "+path); + contents.remove(name); + observer.remove(name); + } + else { + try { + C newMember = (C)subject.getObject(path); + Logger.msg(4, "Adding "+path); + contents.add(name); + observer.add(newMember); + } catch (ObjectNotFoundException e) { + Logger.error("Member Subscription: could not load "+path); + Logger.error(e); + } + } + } + } + + public void setObserver(ProxyObserver observer) { + observerReference = new WeakReference>(observer); + } + + public void setSubject(ItemProxy subject) { + this.subject = subject; + } + + public ProxyObserver getObserver() { + return observerReference.get(); + } +} + diff --git a/src/main/java/org/cristalise/kernel/entity/proxy/ProxyClientConnection.java b/src/main/java/org/cristalise/kernel/entity/proxy/ProxyClientConnection.java new file mode 100644 index 0000000..9f65afa --- /dev/null +++ b/src/main/java/org/cristalise/kernel/entity/proxy/ProxyClientConnection.java @@ -0,0 +1,208 @@ +/** + * This file is part of the CRISTAL-iSE kernel. + * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved. + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; either version 3 of the License, or (at + * your option) any later version. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * + * http://www.fsf.org/licensing/licenses/lgpl.html + */ +package org.cristalise.kernel.entity.proxy; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.InterruptedIOException; +import java.io.PrintWriter; +import java.net.Socket; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.Iterator; + +import org.cristalise.kernel.common.InvalidDataException; +import org.cristalise.kernel.lookup.ItemPath; +import org.cristalise.kernel.process.Gateway; +import org.cristalise.kernel.utils.Logger; +import org.cristalise.kernel.utils.server.SocketHandler; + + +/************************************************************************** + * + * $Revision: 1.18 $ + * $Date: 2005/05/10 11:40:09 $ + * + * Copyright (C) 2003 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +public class ProxyClientConnection implements SocketHandler { + + Socket clientSocket = null; + static int clientId = -1; + int thisClientId; + ArrayList subscribedItems; + PrintWriter response; + BufferedReader request; + boolean closing = false; + + public ProxyClientConnection() { + super(); + thisClientId = ++clientId; + Gateway.getProxyServer().registerProxyClient(this); + Logger.msg(1, "Proxy Client Connection Handler "+thisClientId+" ready."); + } + + + @Override + public String getName() { + return "Proxy Client Connection"; + } + + @Override + public boolean isBusy() { + return clientSocket != null; + } + + @Override + public synchronized void setSocket(Socket newSocket) { + try { + Logger.msg(1, "Proxy Client Connection "+thisClientId+" connect from "+newSocket.getInetAddress()+":"+newSocket.getPort()); + newSocket.setSoTimeout(500); + clientSocket = newSocket; + response = new PrintWriter(clientSocket.getOutputStream(), true); + subscribedItems = new ArrayList(); + } catch (SocketException ex) { + Logger.msg("Could not set socket timeout:"); + Logger.error(ex); + closeSocket(); + } catch (IOException ex) { + Logger.msg("Could not setup output stream:"); + Logger.error(ex); + closeSocket(); + } + } + + /** + * Main loop. Reads proxy commands from the client and acts on them. + */ + @Override + public void run() { + Thread.currentThread().setName("Proxy Client Connection: "+clientSocket.getInetAddress()); + Logger.msg(7, "ProxyClientConnection "+thisClientId+" - Setting up proxy client connection with "+clientSocket.getInetAddress()); + try { + request = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); + String input = null; + ProxyMessage thisMessage; + while (clientSocket != null) { + try { + input = request.readLine(); + Logger.msg(9, "ProxyClientConnection "+thisClientId+" - received "+input); + thisMessage = new ProxyMessage(input); + processMessage(thisMessage); + } catch (InterruptedIOException ex) { //timeout + } catch (InvalidDataException ex) { // invalid proxy message + Logger.error("ProxyClientConnection "+thisClientId+" - Invalid proxy message: "+input); + } + + } + } catch (IOException ex) { + if (!closing) + Logger.error("ProxyClientConnection "+thisClientId+" - Error reading from socket."); + } + closeSocket(); + Logger.msg(1, "ProxyClientConnection "+thisClientId+" closed."); + } + + private void processMessage(ProxyMessage message) throws InvalidDataException { + + // proxy disconnection + if (message.getPath().equals(ProxyMessage.BYEPATH)) { + Logger.msg(7, "ProxyClientConnection "+thisClientId+" disconnecting"); + closeSocket(); + } + + // proxy checking connection + else if (message.getPath().equals(ProxyMessage.PINGPATH)) + response.println(ProxyMessage.pingMessage); + + // new subscription to entity changes + else if (message.getPath().equals(ProxyMessage.ADDPATH)) { + Logger.msg(7, "ProxyClientConnection "+thisClientId+" subscribed to "+message.getItemPath()); + synchronized (subscribedItems) { + subscribedItems.add(message.getItemPath()); + } + } + + // remove of subscription to entity changes + else if (message.getPath().equals(ProxyMessage.DELPATH)) { + synchronized (subscribedItems) { + subscribedItems.remove(message.getItemPath()); + } + Logger.msg(7, "ProxyClientConnection "+thisClientId+" unsubscribed from "+message.getItemPath()); + } + + else // unknown message + Logger.error("ProxyClientConnection "+thisClientId+" - Unknown message type: "+message); + + } + + public synchronized void sendMessage(ProxyMessage message) { + if (clientSocket==null) return; // idle + boolean relevant = message.getItemPath() == null; + synchronized (subscribedItems) { + for (Iterator iter = subscribedItems.iterator(); iter.hasNext() && !relevant;) { + ItemPath thisKey = iter.next(); + if (thisKey.equals(message.getItemPath())) + relevant = true; + } + } + if (!relevant) return; // not for our client + + response.println(message); + } + + @Override + public void shutdown() { + if (isBusy()) { + closing = true; + Logger.msg("ProxyClientConnection "+thisClientId+" closing."); + closeSocket(); + } + } + + @Override + public String toString() { + if (clientSocket == null) return thisClientId+": idle"; + else return thisClientId+": "+clientSocket.getInetAddress(); + } + + private synchronized void closeSocket() { + if (clientSocket==null) return; + try { + request.close(); + response.close(); + clientSocket.close(); + } catch (IOException e) { + Logger.error("ProxyClientConnection "+thisClientId+" - Could not close socket."); + Logger.error(e); + } + synchronized (subscribedItems) { + subscribedItems = null; + } + + clientSocket = null; + + } + +} diff --git a/src/main/java/org/cristalise/kernel/entity/proxy/ProxyManager.java b/src/main/java/org/cristalise/kernel/entity/proxy/ProxyManager.java new file mode 100644 index 0000000..6f5bde3 --- /dev/null +++ b/src/main/java/org/cristalise/kernel/entity/proxy/ProxyManager.java @@ -0,0 +1,277 @@ +/** + * This file is part of the CRISTAL-iSE kernel. + * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved. + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; either version 3 of the License, or (at + * your option) any later version. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * + * http://www.fsf.org/licensing/licenses/lgpl.html + */ +package org.cristalise.kernel.entity.proxy; + +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.HashMap; +import java.util.Iterator; + +import org.cristalise.kernel.common.InvalidDataException; +import org.cristalise.kernel.common.ObjectNotFoundException; +import org.cristalise.kernel.lookup.AgentPath; +import org.cristalise.kernel.lookup.DomainPath; +import org.cristalise.kernel.lookup.ItemPath; +import org.cristalise.kernel.lookup.Path; +import org.cristalise.kernel.persistency.ClusterStorage; +import org.cristalise.kernel.process.Gateway; +import org.cristalise.kernel.property.Property; +import org.cristalise.kernel.utils.Logger; +import org.cristalise.kernel.utils.SoftCache; + + + +public class ProxyManager +{ + SoftCache proxyPool = new SoftCache(50); + HashMap treeSubscribers = new HashMap(); + HashMap connections = new HashMap(); + + /** + * Create a proxy manager to listen for proxy events and reap unused proxies + */ + public ProxyManager() + { + Logger.msg(5, "ProxyManager - Starting....."); + + Iterator servers = Gateway.getLookup().search(new DomainPath("/servers"), new Property("Type", "Server", false)); + while(servers.hasNext()) { + Path thisServerResult = servers.next(); + try { + ItemPath thisServerPath = thisServerResult.getItemPath(); + String remoteServer = ((Property)Gateway.getStorage().get(thisServerPath, ClusterStorage.PROPERTY+"/Name", null)).getValue(); + String portStr = ((Property)Gateway.getStorage().get(thisServerPath, ClusterStorage.PROPERTY+"/ProxyPort", null)).getValue(); + int remotePort = Integer.parseInt(portStr); + connectToProxyServer(remoteServer, remotePort); + + } catch (Exception ex) { + Logger.error("Exception retrieving proxy server connection data for "+thisServerResult); + Logger.error(ex); + } + } + } + + public void connectToProxyServer(String name, int port) { + ProxyServerConnection oldConn = connections.get(name); + if (oldConn != null) + oldConn.shutdown(); + connections.put(name, new ProxyServerConnection(name, port, this)); + } + + + protected void resubscribe(ProxyServerConnection conn) { + synchronized (proxyPool) { + for (ItemPath key : proxyPool.keySet()) { + ProxyMessage sub = new ProxyMessage(key, ProxyMessage.ADDPATH, false); + Logger.msg(5, "Subscribing to item "+key); + conn.sendMessage(sub); + } + } + } + + /** + * @param sub + */ + private void sendMessage(ProxyMessage sub) { + for (ProxyServerConnection element : connections.values()) { + element.sendMessage(sub); + } + + } + + public void shutdown() { + Logger.msg("ProxyManager.shutdown() - flagging shutdown of server connections"); + for (ProxyServerConnection element : connections.values()) { + element.shutdown(); + } + } + + protected void processMessage(ProxyMessage thisMessage) throws InvalidDataException { + if (Logger.doLog(9)) Logger.msg(9, thisMessage.toString()); + + if (thisMessage.getPath().equals(ProxyMessage.PINGPATH)) // ping response + return; + + if (thisMessage.getItemPath() == null) // must be domain path info + informTreeSubscribers(thisMessage.getState(), thisMessage.getPath()); + else { + // proper proxy message + Logger.msg(5, "Received proxy message: "+thisMessage.toString()); + ItemProxy relevant = proxyPool.get(thisMessage.getItemPath()); + if (relevant == null) + Logger.warning("Received proxy message for sysKey "+thisMessage.getItemPath()+" which we don't have a proxy for."); + else + try { + relevant.notify(thisMessage); + } catch (Throwable ex) { + Logger.error("Error caught notifying proxy listener "+relevant.toString()+" of "+thisMessage.toString()); + Logger.error(ex); + } + } + } + + private void informTreeSubscribers(boolean state, String path) { + DomainPath last = new DomainPath(path); + DomainPath parent; boolean first = true; + synchronized(treeSubscribers) { + while((parent = last.getParent()) != null) { + ArrayList currentKeys = new ArrayList(); + currentKeys.addAll(treeSubscribers.keySet()); + for (DomainPathSubscriber sub : currentKeys) { + DomainPath interest = treeSubscribers.get(sub); + if (interest!= null && interest.equals(parent)) { + if (state == ProxyMessage.ADDED) + sub.pathAdded(last); + else if (first) + sub.pathRemoved(last); + } + } + last = parent; + first = false; + } + } + } + + public void subscribeTree(DomainPathSubscriber sub, DomainPath interest) { + synchronized(treeSubscribers) { + treeSubscribers.put(sub, interest); + } + } + + public void unsubscribeTree(DomainPathSubscriber sub) { + synchronized(treeSubscribers) { + treeSubscribers.remove(sub); + } + } + + /************************************************************************** + * + **************************************************************************/ + private ItemProxy createProxy( org.omg.CORBA.Object ior, + ItemPath itemPath) + throws ObjectNotFoundException + { + + ItemProxy newProxy = null; + + Logger.msg(5, "ProxyManager::creating proxy on Item " + itemPath); + + if( itemPath instanceof AgentPath ) { + newProxy = new AgentProxy(ior, (AgentPath)itemPath); + } + else { + newProxy = new ItemProxy(ior, itemPath); + } + + // subscribe to changes from server + ProxyMessage sub = new ProxyMessage(itemPath, ProxyMessage.ADDPATH, false); + sendMessage(sub); + reportCurrentProxies(9); + return ( newProxy ); + } + + protected void removeProxy( ItemPath itemPath ) + { + ProxyMessage sub = new ProxyMessage(itemPath, ProxyMessage.DELPATH, true); + Logger.msg(5,"ProxyManager.removeProxy() - Unsubscribing to proxy informer for "+itemPath); + sendMessage(sub); + } + + + /************************************************************************** + * Called by the other GetProxy methods. Fills in either the ior or the + * SystemKey + **************************************************************************/ + private ItemProxy getProxy( org.omg.CORBA.Object ior, + ItemPath itemPath) + throws ObjectNotFoundException + { + + synchronized(proxyPool) { + ItemProxy newProxy; + // return it if it exists + newProxy = proxyPool.get(itemPath); + if (newProxy == null) { + // create a new one + newProxy = createProxy(ior, itemPath); + proxyPool.put(itemPath, newProxy); + } + return newProxy; + + } + } + + /************************************************************************** + * ItemProxy getProxy( String ) + * + * Proxy from Alias + **************************************************************************/ + public ItemProxy getProxy( Path path ) + throws ObjectNotFoundException + { + ItemPath itemPath; + if (path instanceof ItemPath) itemPath = (ItemPath)path; + else itemPath = path.getItemPath(); + Logger.msg(8,"ProxyManager::getProxy(" + path.toString() + ")"); + return getProxy( Gateway.getLookup().resolve(itemPath), + itemPath ); + + } + + public AgentProxy getAgentProxy( AgentPath path ) + throws ObjectNotFoundException + { + return (AgentProxy) getProxy(path); + } + + /************************************************************************** + * void reportCurrentProxies() + * + * A utility to Dump the current proxies loaded + **************************************************************************/ + public void reportCurrentProxies(int logLevel) + { + if (!Logger.doLog(logLevel)) return; + Logger.msg(logLevel, "Current proxies: "); + try { + synchronized(proxyPool) { + Iterator i = proxyPool.keySet().iterator(); + + for( int count=0; i.hasNext(); count++ ) + { + ItemPath nextProxy = i.next(); + ItemProxy thisProxy = proxyPool.get(nextProxy); + if (thisProxy != null) + Logger.msg(logLevel, + "" + count + ": " + + proxyPool.get(nextProxy).getClass().getName() + + ": " + nextProxy); + } + } + } catch (ConcurrentModificationException ex) { + Logger.msg(logLevel, "Proxy cache modified. Aborting."); + } + } + + + +} + diff --git a/src/main/java/org/cristalise/kernel/entity/proxy/ProxyMessage.java b/src/main/java/org/cristalise/kernel/entity/proxy/ProxyMessage.java new file mode 100644 index 0000000..61fdcd4 --- /dev/null +++ b/src/main/java/org/cristalise/kernel/entity/proxy/ProxyMessage.java @@ -0,0 +1,129 @@ +/** + * This file is part of the CRISTAL-iSE kernel. + * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved. + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; either version 3 of the License, or (at + * your option) any later version. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * + * http://www.fsf.org/licensing/licenses/lgpl.html + */ +package org.cristalise.kernel.entity.proxy; + +import java.io.IOException; +import java.net.DatagramPacket; + +import org.cristalise.kernel.common.InvalidDataException; +import org.cristalise.kernel.lookup.InvalidItemPathException; +import org.cristalise.kernel.lookup.ItemPath; + + + +/************************************************************************** + * + * $Revision: 1.11 $ + * $Date: 2005/05/10 11:40:09 $ + * + * Copyright (C) 2003 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +public class ProxyMessage { + + // special server message paths + public static final String BYEPATH = "bye"; + public static final String ADDPATH = "add"; + public static final String DELPATH = "del"; + public static final String PINGPATH = "ping"; + public static final boolean ADDED = false; + public static final boolean DELETED = true; + + static ProxyMessage byeMessage = new ProxyMessage(null, BYEPATH, ADDED); + static ProxyMessage pingMessage = new ProxyMessage(null, PINGPATH, ADDED); + + private ItemPath itemPath = null; + private String path = ""; + private String server = null; + private boolean state = ADDED; + + public ProxyMessage() { + super(); + } + public ProxyMessage(ItemPath itemPath, String path, boolean state) { + this(); + setItemPath(itemPath); + setPath(path); + setState(state); + } + + public ProxyMessage(String line) throws InvalidDataException, IOException { + if (line == null) + throw new IOException("Null proxy message"); + String[] tok = line.split(":"); + if (tok.length != 2) + throw new InvalidDataException("String '"+line+"' does not constitute a valid proxy message."); + if (tok[0].length() > 0 && !tok[0].equals("tree")) { + try { + itemPath = new ItemPath(tok[0]); + } catch (InvalidItemPathException e) { + throw new InvalidDataException("Item in proxy message "+line+" was not valid"); + } + } + path = tok[1]; + if (path.startsWith("-")) { + state = DELETED; + path = path.substring(1); + } + } + + public ProxyMessage(DatagramPacket packet) throws InvalidDataException, IOException { + this(new String(packet.getData())); + } + + public ItemPath getItemPath() { + return itemPath; + } + + public void setItemPath(ItemPath itemPath) { + this.itemPath = itemPath; + } + + public String getPath() { + return path; + } + + public void setPath(String newPath) { + this.path = newPath; + } + + public boolean getState() { + return state; + } + + public void setState(boolean state) { + this.state = state; + } + + @Override + public String toString() { + return (itemPath==null?"tree":itemPath.getUUID())+":"+(state?"-":"")+path; + } + + public String getServer() { + return server; + } + + public void setServer(String server) { + this.server = server; + } +} diff --git a/src/main/java/org/cristalise/kernel/entity/proxy/ProxyObserver.java b/src/main/java/org/cristalise/kernel/entity/proxy/ProxyObserver.java new file mode 100644 index 0000000..bd21034 --- /dev/null +++ b/src/main/java/org/cristalise/kernel/entity/proxy/ProxyObserver.java @@ -0,0 +1,47 @@ +/** + * This file is part of the CRISTAL-iSE kernel. + * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved. + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; either version 3 of the License, or (at + * your option) any later version. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * + * http://www.fsf.org/licensing/licenses/lgpl.html + */ +package org.cristalise.kernel.entity.proxy; + +import org.cristalise.kernel.entity.C2KLocalObject; + + + +public interface ProxyObserver +{ + /************************************************************************** + * Subscribed items are broken apart and fed one by one to these methods. + * Replacement after an event is done by feeding the new memberbase with the same id. + * ID could be an XPath? + **************************************************************************/ + public void add(V contents); + + /************************************************************************** + * the 'type' parameter should be an indication of the type of object + * supplied so that the subscriber can associate the call back with + * one of its subscriptions. If we go with an Xpath subscription form, + * then the id will probably be sufficient. + * Should be comparable (substring whatever) with the parameter given to + * the subscribe method of ItemProxy. + **************************************************************************/ + public void remove(String id); + + public void control(String control, String msg); +} diff --git a/src/main/java/org/cristalise/kernel/entity/proxy/ProxyServer.java b/src/main/java/org/cristalise/kernel/entity/proxy/ProxyServer.java new file mode 100644 index 0000000..249ec0f --- /dev/null +++ b/src/main/java/org/cristalise/kernel/entity/proxy/ProxyServer.java @@ -0,0 +1,129 @@ +/** + * This file is part of the CRISTAL-iSE kernel. + * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved. + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; either version 3 of the License, or (at + * your option) any later version. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * + * http://www.fsf.org/licensing/licenses/lgpl.html + */ +package org.cristalise.kernel.entity.proxy; + +import java.util.ArrayList; +import java.util.concurrent.LinkedBlockingQueue; + +import org.cristalise.kernel.process.Gateway; +import org.cristalise.kernel.utils.Logger; +import org.cristalise.kernel.utils.server.SimpleTCPIPServer; + + +public class ProxyServer implements Runnable { + + // server objects + ArrayList proxyClients; + SimpleTCPIPServer proxyListener = null; + String serverName = null; + boolean keepRunning = true; + LinkedBlockingQueue messageQueue; + + public ProxyServer(String serverName) { + Logger.msg(5, "ProxyManager::initServer - Starting....."); + int port = Gateway.getProperties().getInt("ItemServer.Proxy.port", 0); + this.serverName = serverName; + this.proxyClients = new ArrayList(); + this.messageQueue = new LinkedBlockingQueue(); + + if (port == 0) { + Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of changes."); + return; + } + + // set up the proxy server + try { + Logger.msg(5, "ProxyManager::initServer - Initialising proxy informer on port "+port); + proxyListener = new SimpleTCPIPServer(port, ProxyClientConnection.class, 200); + proxyListener.startListening(); + } catch (Exception ex) { + Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of changes."); + Logger.error(ex); + } + // start the message queue delivery thread + new Thread(this).start(); + } + + @Override + public void run() { + + while(keepRunning) { + ProxyMessage message = messageQueue.poll(); + if (message != null) { + synchronized(proxyClients) { + for (ProxyClientConnection client : proxyClients) { + client.sendMessage(message); + } + } + } else + try { + synchronized(this) { + if (messageQueue.isEmpty()) wait(); + } + } catch (InterruptedException e) { } + } + + } + + public String getServerName() { + return serverName; + } + + public void sendProxyEvent(ProxyMessage message) { + try { + synchronized(this) { + messageQueue.put(message); + notify(); + } + } catch (InterruptedException e) { } + } + + public void reportConnections(int logLevel) { + synchronized(proxyClients) { + Logger.msg(logLevel, "Currently connected proxy clients:"); + for (ProxyClientConnection client : proxyClients) { + Logger.msg(logLevel, " "+client); + } + } + } + + public void shutdownServer() { + Logger.msg(1, "ProxyManager: Closing Server."); + proxyListener.stopListening(); + synchronized(this) { + keepRunning = false; + notify(); + } + } + + public void registerProxyClient(ProxyClientConnection client) { + synchronized(proxyClients) { + proxyClients.add(client); + } + } + + public void unRegisterProxyClient(ProxyClientConnection client) { + synchronized(proxyClients) { + proxyClients.remove(client); + } + } + +} diff --git a/src/main/java/org/cristalise/kernel/entity/proxy/ProxyServerConnection.java b/src/main/java/org/cristalise/kernel/entity/proxy/ProxyServerConnection.java new file mode 100644 index 0000000..1b4fba4 --- /dev/null +++ b/src/main/java/org/cristalise/kernel/entity/proxy/ProxyServerConnection.java @@ -0,0 +1,145 @@ +/** + * This file is part of the CRISTAL-iSE kernel. + * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved. + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; either version 3 of the License, or (at + * your option) any later version. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * + * http://www.fsf.org/licensing/licenses/lgpl.html + */ +package org.cristalise.kernel.entity.proxy; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.InterruptedIOException; +import java.io.PrintWriter; +import java.net.Socket; + +import org.cristalise.kernel.common.InvalidDataException; +import org.cristalise.kernel.utils.Logger; + + + +public class ProxyServerConnection extends Thread +{ + + public boolean serverIsActive = true; + // proxy client details + String serverName; + int serverPort; + Socket serverConnection; + ProxyManager manager; + // for talking to the proxy server + PrintWriter serverStream; + boolean listening = false; + static boolean isServer = false; + + /** + * Create an entity proxy manager to listen for proxy events and reap unused proxies + */ + public ProxyServerConnection(String host, int port, ProxyManager manager) + { + Logger.msg(5, "ProxyServerConnection - Initialising connection to "+host+":"+port); + serverName = host; + serverPort = port; + this.manager = manager; + listening = true; + start(); + } + + @Override + public void run() { + Thread.currentThread().setName("Proxy Client Connection Listener to "+serverName+":"+serverPort); + while (listening) { + try { + if (serverConnection == null) connect(); + if (serverConnection != null) { + BufferedReader request = new BufferedReader(new InputStreamReader(serverConnection.getInputStream())); + String input = null; + ProxyMessage thisMessage; + while (listening && serverConnection != null) { + try { + input = request.readLine(); + thisMessage = new ProxyMessage(input); + thisMessage.setServer(serverName); + manager.processMessage(thisMessage); + } catch (InterruptedIOException ex) { // timeout - send a ping + sendMessage(ProxyMessage.pingMessage); + } catch (InvalidDataException ex) { // invalid proxy message + if (input != null) + Logger.error("EntityProxyManager - Invalid proxy message: "+input); + } + } + } + } catch (IOException ex) { + Logger.error("ProxyServerConnection - Disconnected from "+serverName+":"+serverPort); + try { + serverStream.close(); + serverConnection.close(); + } catch (IOException e1) { } + + + serverStream = null; + serverConnection = null; + } + } + + if (serverStream != null) { + try { + Logger.msg(1, "Disconnecting from proxy server on "+serverName+":"+serverPort); + serverStream.println(ProxyMessage.byeMessage.toString()); + serverStream.close(); + serverConnection.close(); + serverConnection = null; + } catch (Exception e) { + Logger.error("Error disconnecting from proxy server."); + } + } + } + + public void connect() { + Logger.msg(3, "ProxyServerConnection - connecting to proxy server on "+serverName+":"+serverPort); + try { + serverConnection = new Socket(serverName, serverPort); + serverConnection.setKeepAlive(true); + serverIsActive = true; + serverConnection.setSoTimeout(5000); + serverStream = new PrintWriter(serverConnection.getOutputStream(), true); + Logger.msg("Connected to proxy server on "+serverName+":"+serverPort); + manager.resubscribe(this); + } catch (Exception e) { + Logger.msg(3, "Could not connect to proxy server. Retrying in 5s"); + try { Thread.sleep(5000); } catch (InterruptedException ex) { } + serverStream = null; + serverConnection = null; + serverIsActive = false; + } + } + + public void shutdown() { + Logger.msg("Proxy Client: flagging shutdown."); + listening = false; + } + + /** + * @param sub + */ + public void sendMessage(ProxyMessage sub) { + if (serverStream != null) + serverStream.println(sub); + } + +} + diff --git a/src/main/java/org/cristalise/kernel/entity/proxy/package-info.java b/src/main/java/org/cristalise/kernel/entity/proxy/package-info.java new file mode 100644 index 0000000..6391b65 --- /dev/null +++ b/src/main/java/org/cristalise/kernel/entity/proxy/package-info.java @@ -0,0 +1,76 @@ +/** + * This file is part of the CRISTAL-iSE kernel. + * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved. + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; either version 3 of the License, or (at + * your option) any later version. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * + * http://www.fsf.org/licensing/licenses/lgpl.html + */ +/** + * The Proxy API is a major part of the client-side functionality of the + * CRISTAL API, which provides client-side proxy objects that represent the + * Items and Agents on the server. It is the main entry point for many + * components, such as Scripts and Job execution. An AgentProxy is returned on + * login, and should be used as the root for all user-based CRISTAL interactions. + * + *

The Proxy API provides the following functionality: + * + *

    + *
  • Transparent storage integration - Combines direct database access + * with remote calls to data retrieval methods on the Items. This allows client + * processes to load Item data directly from databases whenever possible + * without bothering the CRISTAL server. For example, the LDAP Lookup + * implementation allows client processes to load Item Properties directly from + * the LDAP server.
  • + * + *
  • Data object browsing and loading - The proxy objects allow client + * processes to browse through the storage cluster structure beneath the Item, + * and access the objects directly without having to unmarshall their XML forms. + * All object types have their own get methods, so there's no need to construct + * their paths nor cast. + * + *
  • Item object and directory change notification - When a proxy + * object is created, it notifies the CRISTAL server that its Item is located + * on, and it notified of all additions, deletions and modifications of objects + * within that Item so it can remain up-to-date. Client applications may use + * the {@link ProxyObserver} interface to be notified of changes, using + * {@link MemberSubscription} instances to set up push subscriptions to cluster + * contents. It also provides a mechanism for subscribing to directory paths, + * so that domain tree browsers can implement asynchronous loading and update + * themselves when the tree changes.
  • + * + *
  • Job querying - Job objects may be retrieved directly from an + * ItemProxy, and may also be filtered by Activity name.
  • + * + *
  • Job execution - The {@link AgentProxy} provides the main + * execution method for Jobs. This method performs outcome validation and + * executes required CRISTAL Scripts in the client process before the execution + * is requested on the server. Additional execution methods to call Predefined + * Steps are also available. + * + *
  • Utility methods for resolution and marshalling - The AgentProxy + * provides utility methods for finding Items in the directory by name, path, + * or system key, and gives access to the Castor XML marshalling system to + * transform CRISTAL objects to XML and back again.
  • + *
+ *

The core object of the Proxy API is the ProxyManager, which is initialized + * as a static member of the Gateway on initialization. This object can be used + * to create a Proxy object from a Path from the directory, and maintains a + * connection to the server called the Proxy Update Notification Channel, + * through which it subscribes to Items it holds proxies for so it can be + * informed of changes to Item data through {@link ProxyMessage} objects. + * + */ +package org.cristalise.kernel.entity.proxy; \ No newline at end of file -- cgit v1.2.3