From b086f57f56bf0eb9dab9cf321a0f69aaaae84347 Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Wed, 30 May 2012 08:37:45 +0200 Subject: Initial Maven Conversion --- .../java/com/c2kernel/process/AbstractMain.java | 161 ++++++++ src/main/java/com/c2kernel/process/Bootstrap.java | 288 ++++++++++++++ src/main/java/com/c2kernel/process/Gateway.java | 429 +++++++++++++++++++++ .../java/com/c2kernel/process/ItemHTTPBridge.java | 52 +++ src/main/java/com/c2kernel/process/Module.java | 303 +++++++++++++++ .../java/com/c2kernel/process/ModuleManager.java | 82 ++++ .../java/com/c2kernel/process/StandardClient.java | 18 + .../java/com/c2kernel/process/StandardServer.java | 127 ++++++ .../java/com/c2kernel/process/UserCodeProcess.java | 234 +++++++++++ 9 files changed, 1694 insertions(+) create mode 100644 src/main/java/com/c2kernel/process/AbstractMain.java create mode 100644 src/main/java/com/c2kernel/process/Bootstrap.java create mode 100644 src/main/java/com/c2kernel/process/Gateway.java create mode 100644 src/main/java/com/c2kernel/process/ItemHTTPBridge.java create mode 100644 src/main/java/com/c2kernel/process/Module.java create mode 100644 src/main/java/com/c2kernel/process/ModuleManager.java create mode 100644 src/main/java/com/c2kernel/process/StandardClient.java create mode 100644 src/main/java/com/c2kernel/process/StandardServer.java create mode 100644 src/main/java/com/c2kernel/process/UserCodeProcess.java (limited to 'src/main/java/com/c2kernel/process') diff --git a/src/main/java/com/c2kernel/process/AbstractMain.java b/src/main/java/com/c2kernel/process/AbstractMain.java new file mode 100644 index 0000000..7401d1b --- /dev/null +++ b/src/main/java/com/c2kernel/process/AbstractMain.java @@ -0,0 +1,161 @@ +/************************************************************************** + * AbstractMain + * + * $Revision: 1.67 $ + * $Date: 2004/10/25 15:27:35 $ + * + * Copyright (C) 2001 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +package com.c2kernel.process; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.PrintStream; + +import com.c2kernel.utils.FileStringUtility; +import com.c2kernel.utils.Logger; + +/************************************************************************** + * + * @author $Author: abranson $ $Date: 2004/10/25 15:27:35 $ + * @version $Revision: 1.67 $ + **************************************************************************/ +abstract public class AbstractMain +{ + public static boolean runningAsWrapper = false; + + /************************************************************************** + * + **************************************************************************/ + static protected void usage() + { + System.out.println(); + System.out.println("USAGE: com.c2kernel.process.AbstractMain \n" + + " -config \n" + + " [-connect ] (or LocalCentre in conf)\n" + + " [-resURL c2kernel/resources] (or KernelResourceURL in conf)\n" + + " [-domResURL domain/resources] (or DomainResourceURL in conf)\n" + + " [-h] [-help] \n" + + " [-logLevel 0-19] \n" + + " [-logFile ]"); + Logger.die("Initialisation error"); + } + + /************************************************************************** + * reading and setting the standard c2k input paramaters + **************************************************************************/ + static public java.util.Properties readC2KArgs( String[] args ) + { + int i = 0; + String configPath = null; + String connectPath = null; + java.util.Properties c2kProps = null; + int logLevel = 0; + PrintStream logStream = System.out; + String centreId = null; + + try + { + if( args != null ) + { + while( i < args.length ) + { + if( args[i].equals("-h") || args[i].equals("-help") ) + { + usage(); + } + else if(args[i].equals("-config")) + { + if( (i+1) >= args.length ) + { + System.out.println("AbstractMain::readC2KArgs() - argument expected " + + "for -config"); + usage(); + } + System.out.println("Config file: "+args[i+1]); + configPath = args[++i]; + + } + else if(args[i].equals("-connect")) + { + if( (i+1) < args.length ) // batch file will have no arg if no cmd line arg + { + connectPath = args[++i]; + } + } + else if(args[i].equals("-logLevel")) + { + if( (i+1) >= args.length ) + { + System.out.println("AbstractMain::readC2KArgs() - argument expected " + + "for -logLevel"); + usage(); + } + logLevel = Integer.parseInt(args[++i]); + } + else if(args[i].equals("-logFile")) + { + if( (i+1) >= args.length ) + { + System.out.println("AbstractMain::readC2KArgs() - argument expected " + + "for -logFile"); + usage(); + } + logStream = new PrintStream(new FileOutputStream(args[++i], true)); + } + i++; + } + + // Set up log stream + Logger.addLogStream(logStream, logLevel); + + if (configPath == null) { + System.out.println("No config file specified"); + usage(); + } + + // Load config & connect files into c2kprops + c2kProps = FileStringUtility.loadConfigFile( configPath ); + + if (connectPath == null) { + // see if LC is listed in the config + Logger.msg(6, "No connect file specified in arguments. Looking in config."); + centreId = c2kProps.getProperty("LocalCentre"); + if (centreId!= null) connectPath = "connect/"+centreId+".clc"; + } + + if (connectPath != null) { + Logger.msg(6, "Connect file: "+connectPath); + if (centreId == null) { + String connectFileName = new File(connectPath).getName(); + centreId = connectFileName.substring(0, connectFileName.lastIndexOf(".clc")); + c2kProps.setProperty("LocalCentre", centreId); + } + FileStringUtility.appendConfigFile( c2kProps, connectPath); + } + else { + System.out.println("No connect file specified in args nor config file. Cannot continue."); + usage(); + } + } + else + { + System.out.println("AbstractMain::readC2KArgs() - no arguments!"); + usage(); + } + } + catch( Exception ex ) + { + System.out.println("Main::readC2KArgs() - bad arguments! "); + ex.printStackTrace(); + usage(); + } + + Logger.msg(7, "AbstractMain::standardSetUp() - readC2KArgs() DONE."); + + return c2kProps; + } + +} diff --git a/src/main/java/com/c2kernel/process/Bootstrap.java b/src/main/java/com/c2kernel/process/Bootstrap.java new file mode 100644 index 0000000..7eacd86 --- /dev/null +++ b/src/main/java/com/c2kernel/process/Bootstrap.java @@ -0,0 +1,288 @@ +package com.c2kernel.process; + +import java.net.InetAddress; +import java.util.Enumeration; +import java.util.StringTokenizer; + +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.entity.TraceableEntity; +import com.c2kernel.entity.proxy.ItemProxy; +import com.c2kernel.events.Event; +import com.c2kernel.events.History; +import com.c2kernel.lifecycle.CompositeActivityDef; +import com.c2kernel.lifecycle.instance.CompositeActivity; +import com.c2kernel.lifecycle.instance.Workflow; +import com.c2kernel.lifecycle.instance.predefined.PredefinedStepContainer; +import com.c2kernel.lifecycle.instance.predefined.ServerPredefinedStepContainer; +import com.c2kernel.lifecycle.instance.stateMachine.States; +import com.c2kernel.lifecycle.instance.stateMachine.Transitions; +import com.c2kernel.lookup.AgentPath; +import com.c2kernel.lookup.DomainPath; +import com.c2kernel.lookup.EntityPath; +import com.c2kernel.lookup.LDAPLookup; +import com.c2kernel.lookup.Path; +import com.c2kernel.lookup.RolePath; +import com.c2kernel.persistency.ClusterStorage; +import com.c2kernel.persistency.outcome.Outcome; +import com.c2kernel.persistency.outcome.Viewpoint; +import com.c2kernel.property.Property; +import com.c2kernel.property.PropertyArrayList; +import com.c2kernel.property.PropertyDescription; +import com.c2kernel.property.PropertyDescriptionList; +import com.c2kernel.utils.CastorXMLUtility; +import com.c2kernel.utils.FileStringUtility; +import com.c2kernel.utils.LocalObjectLoader; +import com.c2kernel.utils.Logger; +import com.c2kernel.utils.Resource; + +/** + * @version $Revision: 1.25 $ $Date: 2006/01/10 09:48:32 $ + * @author $Author: abranson $ + */ + +public class Bootstrap +{ + static DomainPath thisServerPath; + + /** + * Run everything without timing-out the service wrapper + */ + public static void run() throws Exception { + // check for system agents + checkAdminAgents(); + + // create the server's mother item + createServerItem(); + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.currentThread().setName("Bootstrapper"); + + // make sure all of the boot items are up-to-date + verifyBootDataItems(); + + // verify the server item's wf + initServerItemWf(); + + Logger.msg("Bootstrap.run() - Bootstrapping complete"); + } catch (Exception e) { + Logger.error(e); + Logger.die("Exception performing bootstrap. Check that everything is OK."); + } + } + }).start(); + } + + /************************************************************************** + * Checks all kernel descriptions, stored in resources + **************************************************************************/ + public static void verifyBootDataItems() throws Exception { + String bootItems; + Logger.msg(1, "Verifying kernel boot items"); + bootItems = FileStringUtility.url2String(Resource.getKernelResourceURL("boot/allbootitems.txt")); + verifyBootDataItems(bootItems, null); + Logger.msg(1, "Boot data items complete"); + } + + private static void verifyBootDataItems(String bootList, String ns) { + StringTokenizer str = new StringTokenizer(bootList, "\n\r"); + while (str.hasMoreTokens()) { + String thisItem = str.nextToken(); + int delim = thisItem.indexOf('/'); + String itemType = thisItem.substring(0,delim); + String itemName = thisItem.substring(delim+1); + try { + String data = Resource.getTextResource(ns, "boot/"+thisItem+(itemType.equals("OD")?".xsd":".xml")); + if (data == null) + Logger.die("No data found for "+getDataType(itemType)+" "+itemName); + verifyResource(ns, itemName, itemType, data); + } catch (Exception e) { + Logger.error(e); + Logger.die("Error importing bootstrap items. Unsafe to continue."); + } + } + } + + + public static void verifyResource(String ns, String itemName, String itemType, String data) throws Exception { + Logger.msg(1, "Bootstrap.verifyResource() - Verifying data of "+getDataType(itemType)+" "+itemName); + Enumeration en = Gateway.getLDAPLookup().search(getTypeRoot(itemType), itemName); + ItemProxy thisProxy; + + if (!en.hasMoreElements()) { + Logger.msg("Bootstrap.verifyResource() - "+getDataType(itemType)+" "+itemName+" not found. Creating new."); + thisProxy = createResourceItem(itemType, itemName, ns); + } + else { + DomainPath path = (DomainPath)en.nextElement(); + thisProxy = (ItemProxy)Gateway.getProxyManager().getProxy(path); + try { + Viewpoint currentData = (Viewpoint)thisProxy.getObject(ClusterStorage.VIEWPOINT+"/"+getDataType(itemType)+"/last"); + String oldData = currentData.getOutcome().getData(); + if (data.equals(oldData)) { + Logger.msg(5, "Bootstrap.verifyResource() - Data identical, no update required"); + return; + } + } catch (ObjectNotFoundException ex) { + Logger.error("Bootstrap.verifyResource() - Item exists but no data found! Attempting to insert new."); + } + } + // data was missing or doesn't match + Logger.msg("Bootstrap.verifyResource() - Writing new data to "+getDataType(itemType)+" "+itemName); + History hist = new History(thisProxy.getSystemKey(), thisProxy); + Event newEvent = hist.addEvent("system", "Admin", Transitions.DONE, "Import", "Import", "Import", States.FINISHED); + Outcome newOutcome = new Outcome(newEvent.getID(), data, getDataType(itemType), 0); + Viewpoint newLastView = new Viewpoint(thisProxy.getSystemKey(), getDataType(itemType), "last", 0, newEvent.getID()); + Viewpoint newZeroView = new Viewpoint(thisProxy.getSystemKey(), getDataType(itemType), "0", 0, newEvent.getID()); + Gateway.getStorage().put(thisProxy.getSystemKey(), newOutcome, thisProxy); + Gateway.getStorage().put(thisProxy.getSystemKey(), newLastView, thisProxy); + Gateway.getStorage().put(thisProxy.getSystemKey(), newZeroView, thisProxy); + Gateway.getStorage().commit(thisProxy); + } + + /** + * @param itemType + * @param itemName + * @param data + */ + private static ItemProxy createResourceItem(String itemType, String itemName, String ns) throws Exception { + // create props + PropertyDescriptionList pdList = (PropertyDescriptionList)CastorXMLUtility.unmarshall(Resource.getTextResource(null, "boot/property/"+itemType+"Prop.xml")); + PropertyArrayList props = new PropertyArrayList(); + for (int i = 0; i < pdList.list.size(); i++) { + PropertyDescription pd = pdList.list.get(i); + String propName = pd.getName(); + String propVal = propName.equals("Name")?itemName:pd.getDefaultValue(); + props.list.add(new Property(propName, propVal)); + } + + EntityPath entityPath = Gateway.getLDAPLookup().getNextKeyManager().generateNextEntityKey(); + TraceableEntity newItem = (TraceableEntity)Gateway.getCorbaServer().createEntity(entityPath); + Gateway.getLDAPLookup().add(entityPath); + newItem.initialise( + 1, + CastorXMLUtility.marshall(props), + CastorXMLUtility.marshall(new CompositeActivity())); + DomainPath newDomPath = new DomainPath(getTypeRoot(itemType).toString()+"/system/"+(ns==null?"kernel":ns)+"/"+itemName); + newDomPath.setEntity(entityPath); + Gateway.getLDAPLookup().add(newDomPath); + return (ItemProxy)Gateway.getProxyManager().getProxy(entityPath); + } + + public static DomainPath getTypeRoot(String type) throws Exception { + if (type.equals("CA") || type.equals("EA")) + return new DomainPath("/desc/ActivityDesc/"); + if (type.equals("SC")) + return new DomainPath("/desc/Script/"); + if (type.equals("OD")) + return new DomainPath("/desc/OutcomeDesc/"); + throw new Exception("Unknown bootstrap item type: "+type); + } + + private static String getDataType(String type) throws Exception { + if (type.equals("CA")) + return "CompositeActivityDef"; + if (type.equals("EA")) + return "ElementaryActivityDef"; + if (type.equals("OD")) + return "Schema"; + if (type.equals("SC")) + return "Script"; + throw new Exception("Unknown bootstrap item type: "+type); + + } + + /************************************************************************** + * Checks for the existence of the admin users so you can use Cristal + **************************************************************************/ + private static void checkAgent(String name, String pass, String role, boolean joblist) throws Exception { + Logger.msg(1, "Bootstrap.checkAgent() - Checking for existence of '"+name+"' user."); + LDAPLookup lookup = Gateway.getLDAPLookup(); + try { + lookup.getRoleManager().getAgentPath(name); + Logger.msg(3, "Bootstrap.checkAgent() - User '"+name+"' found."); + return; + } catch (ObjectNotFoundException ex) { } + Logger.msg("Bootstrap.checkAgent() - User '"+name+"' not found. Creating."); + + RolePath rolePath; + try { + rolePath = lookup.getRoleManager().getRolePath(role); + } catch (ObjectNotFoundException ex) { + rolePath = lookup.getRoleManager().createRole(role, joblist); + } + + try { + EntityPath entityPath = lookup.getNextKeyManager().generateNextEntityKey(); + AgentPath agentPath = new AgentPath(entityPath.getSysKey(), name); + agentPath.setPassword(pass); + Gateway.getCorbaServer().createEntity(agentPath); + Gateway.getLDAPLookup().add(agentPath); + + // assign admin role + Logger.msg("Bootstrap.checkAgent() - Assigning role '"+role+"'"); + rolePath.addAgent(agentPath); + Gateway.getStorage().put(agentPath.getSysKey(), new Property("Name", name), null); + Gateway.getStorage().put(agentPath.getSysKey(), new Property("Type", "Agent"), null); + Logger.msg("Bootstrap.checkAgent() - Done"); + } catch (Exception ex) { + Logger.error("Unable to create "+name+" user."); + throw ex; + } + } + + /** + * + */ + public static void checkAdminAgents() throws Exception { + // check for administrative user + String adminPassword = Gateway.getProperty("AdminPassword", "admin12345"); + + checkAgent("admin", adminPassword, "Admin", false); + + // check for import user + checkAgent("system", adminPassword, "Admin", false); + + // check for local usercode user + checkAgent(InetAddress.getLocalHost().getHostName(), "uc", "UserCode", true); + } + + public static void createServerItem() throws Exception { + String serverName = Gateway.getProperty("ItemServer.name"); + thisServerPath = new DomainPath("/servers/"+serverName); + EntityPath serverEntity; + try { + serverEntity = thisServerPath.getEntity(); + } catch (ObjectNotFoundException ex) { + Logger.msg("Creating server item "+thisServerPath); + serverEntity = Gateway.getLDAPLookup().getNextKeyManager().generateNextEntityKey(); + Gateway.getCorbaServer().createEntity(serverEntity); + Gateway.getLDAPLookup().add(serverEntity); + thisServerPath.setEntity(serverEntity); + Gateway.getLDAPLookup().add(thisServerPath); + } + Gateway.getStorage().put(serverEntity.getSysKey(), new Property("Name", serverName), null); + Gateway.getStorage().put(serverEntity.getSysKey(), new Property("Type", "Server"), null); + Gateway.getStorage().put(serverEntity.getSysKey(), new Property("KernelVersion", Resource.getKernelVersion()), null); + if (Gateway.getProperty("ItemServer.Proxy.port") != null) + Gateway.getStorage().put(serverEntity.getSysKey(), + new Property("ProxyPort", Gateway.getProperty("ItemServer.Proxy.port")), null); + if (Gateway.getProperty("ItemServer.Console.port") != null) + Gateway.getStorage().put(serverEntity.getSysKey(), + new Property("ConsolePort", Gateway.getProperty("ItemServer.Console.port")), null); + Gateway.getProxyManager().connectToProxyServer(Gateway.getProperty("ItemServer.name"), Integer.parseInt(Gateway.getProperty("ItemServer.Proxy.port"))); + + } + + public static void initServerItemWf() throws Exception { + CompositeActivityDef serverWfCa = (CompositeActivityDef)LocalObjectLoader.getActDef("ServerItemWorkflow", "last"); + Workflow wf = new Workflow((CompositeActivity)serverWfCa.instantiate()); + PredefinedStepContainer predef = (PredefinedStepContainer)wf.search("workflow/predefined"); + wf.getChildGraphModel().removeVertex(predef); + wf.addChild(new ServerPredefinedStepContainer(), predef.getCentrePoint()); + wf.initialise(thisServerPath.getSysKey(), Gateway.getLDAPLookup().getRoleManager().getAgentPath("system")); + Gateway.getStorage().put(thisServerPath.getSysKey(), wf, null); + // add this proxy server in case it was just registered, or the port has changed + } +} diff --git a/src/main/java/com/c2kernel/process/Gateway.java b/src/main/java/com/c2kernel/process/Gateway.java new file mode 100644 index 0000000..aebd19e --- /dev/null +++ b/src/main/java/com/c2kernel/process/Gateway.java @@ -0,0 +1,429 @@ +package com.c2kernel.process; + +/** + * @version $Revision: 1.17 $ $Date: 2005/10/12 12:51:54 $ + * @author $Author: abranson $ + */ + +import java.io.IOException; +import java.net.MalformedURLException; +import java.util.Enumeration; +import java.util.Properties; + +import com.c2kernel.common.InvalidDataException; +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.entity.CorbaServer; +import com.c2kernel.entity.proxy.AgentProxy; +import com.c2kernel.entity.proxy.EntityProxyManager; +import com.c2kernel.lookup.AgentPath; +import com.c2kernel.lookup.LDAPLookup; +import com.c2kernel.lookup.LDAPProperties; +import com.c2kernel.persistency.ClusterStorageException; +import com.c2kernel.persistency.TransactionManager; +import com.c2kernel.utils.CastorXMLUtility; +import com.c2kernel.utils.FileStringUtility; +import com.c2kernel.utils.Language; +import com.c2kernel.utils.Logger; +import com.c2kernel.utils.Resource; +import com.c2kernel.utils.server.SimpleTCPIPServer; + + +/************************************************************************** + * The Gateway is the central object of a CRISTAL process. It initializes, + * maintains and shuts down every other subsystem in both the client and the + * server. + * + * Child objects: + *
    + *
  • LDAPLookup - Provides access to the CRISTAL directory. Find or + * search for Items or Agents. + *
  • EntityProxyManager - Gives a local proxy object for Entities found + * in LDAP. Execute activities in Items, query or subscribe to Entity data. + *
  • TransactionManager - Access to the configured CRISTAL databases + *
  • CorbaServer - Manages the memory pool of active Entities + *
  • mORB - the Orbacus CORBA ORB + *
+ * + * @author $Author: abranson $ $Date: 2005/10/12 12:51:54 $ + * @version $Revision: 1.17 $ + **************************************************************************/ + +public class Gateway +{ + static private Properties mC2KProps; + static private ModuleManager mModules; + static private org.omg.CORBA.ORB mORB; + static private LDAPLookup mLDAPLookup; + static private TransactionManager mStorage; + static private EntityProxyManager mProxyManager; + static private CorbaServer mCorbaServer; + static private SimpleTCPIPServer mHTTPServer; + + + + private Gateway() { } + + /** + * Initialises the Gateway and all of the client objects it holds, with + * the exception of the LDAPLookup, which is initialised during connect() + * + * @param props - java.util.Properties containing all application properties. + * If null, the java system properties are used + * @throws InvalidDataException - invalid properties caused a failure in initialisation + */ + static public void init(Properties props, boolean isServer) throws InvalidDataException { + + // if supplied props are null, use system props + if (props == null) props = System.getProperties(); + + // report version info + Logger.msg("Kernel version: "+Resource.getKernelVersion()); + + // init module manager + try { + mModules = new ModuleManager(ClassLoader.getSystemResources("module.xml"), isServer); + } catch (IOException e) { + Logger.error(e); + throw new InvalidDataException("Could not load module definitions. Classpath problem", ""); + } + + // Start with default props from kernel jar + try { + mC2KProps = FileStringUtility.loadConfigFile( Resource.getKernelResourceURL("textFiles/defaultConf.properties").toString()); + } catch (MalformedURLException ex) { + Logger.die("Default properties not found. Probable cause is missing resources"); + } + + // merge in module props + Properties moduleProperties = mModules.getAllModuleProperties(); + for (Enumeration e = moduleProperties.propertyNames(); e.hasMoreElements();) { + String propName = (String)e.nextElement(); + mC2KProps.put(propName, moduleProperties.get(propName)); + } + + // Overwrite with supplied props + for (Enumeration e = props.propertyNames(); e.hasMoreElements();) { + String propName = (String)e.nextElement(); + mC2KProps.put(propName, props.get(propName)); + } + + // dump properties + dumpC2KProps(7); + + // load kernel mapfiles + try { + CastorXMLUtility.loadMapsFrom(Resource.getKernelResourceURL("mapFiles/")); + } catch (MalformedURLException e1) { + throw new InvalidDataException("Invalid Resource Location", ""); + } + + //Initialise language file + String languageFile = getProperty("language.file"); + if (languageFile != null && languageFile.length() > 0) { + Language.isTranlated=true; + Language.mTableOfTranslation = FileStringUtility.loadLanguageFile(languageFile); + } + + // run module startup scripts + mModules.runScripts("startup"); + } + + /** + * Makes this process capable of creating and managing server entities. Runs the + * bootstrap to create the root LDAP contexts, initialises the CORBA server and + * time-out manager. + * + * @throws InvalidDataException - error initialising + */ + static public void startServer() throws InvalidDataException { + try { + // check top level LDAP contexts + mLDAPLookup.install(); + + // start entity proxy server + EntityProxyManager.initServer(); + + // Init ORB - set various config to sys properties + java.util.Properties sysProps = System.getProperties(); + String serverName = getProperty("ItemServer.name"); + if (serverName != null) + sysProps.put("ORBHost", serverName); + String serverPort = getProperty("ItemServer.iiop", "1500"); + sysProps.put("ORBPort", serverPort); + //TODO: externalize this (or replace corba completely) + sysProps.put("com.sun.CORBA.POA.ORBServerId", "1"); + sysProps.put("com.sun.CORBA.POA.ORBPersistentServerPort", serverPort); + + //Standard initialisation of the ORB + mORB = org.omg.CORBA.ORB.init(new String[0], sysProps); + + Logger.msg("Gateway.init() - ORB initialised. ORB is " + mORB.getClass().getName() ); + + // start corba server components + mCorbaServer = new CorbaServer(); + + // start checking bootstrap items + Bootstrap.run(); + + // register modules + mModules.registerModules(); + + } catch (Exception ex) { + Logger.error(ex); + Logger.die("Exception starting server components. Shutting down."); + } + + // start the http server +// try { +// int httpPort = Integer.parseInt(Gateway.getProperty("ItemServer.HTTP.port")); +// Logger.msg(2, "Starting HTTP Server on port "+httpPort); +// mHTTPServer = new SimpleTCPIPServer(httpPort, ItemHTTPBridge.class, 5); +// mHTTPServer.startListening(); +// } catch (NumberFormatException ex) { +// Logger.msg(3, "Invalid or no HTTP port defined. HTTP server not available."); +// } + + System.out.println("Server '"+Gateway.getCentreId()+"' initialised."); + } + + public static ModuleManager getModuleManager() { + return mModules; + } + + /** + * Connects to the LDAP server in an administrative context - using the admin username and + * password given in the LDAP.user and LDAP.password props of the kernel properties. + * + * @throws InvalidDataException - bad params + * @throws ClusterStorageException - error starting storages + */ + static public void connect() + throws InvalidDataException, + ClusterStorageException + { + LDAPProperties ldapProps = new LDAPProperties(); + + if( ldapProps.mHost != null && ldapProps.mPort != null && + ldapProps.mUser != null && ldapProps.mPassword != null ) + { + try + { + mLDAPLookup = new LDAPLookup(ldapProps); + } + catch (Exception ex) + { + Logger.error(ex); + throw new InvalidDataException("Cannot authenticate. Name and/or password invalid.", ""); + } + } + else + { + Logger.error("LDAP not configured properly."); + throw new InvalidDataException("Cannot authenticate. Name and/or password invalid.", ""); + } + + setup(); + } + + /** + * Authenticates a user and returns and AgentProxy on them without overriding the system LDAP context. + * Useful for handling multiple users in one context e.g. on a web server + * + * @param agentName - username + * @param agentPassword - password + * @return AgentProxy on that user + * @throws InvalidDataException + * @throws ObjectNotFoundException + */ + static public AgentProxy login(String agentName, String agentPassword) throws InvalidDataException, ObjectNotFoundException { + LDAPProperties ldapProps = new LDAPProperties(); + AgentPath agentPath; + try { + agentPath = mLDAPLookup.getRoleManager().getAgentPath(agentName); + } catch (Exception ex) { + Logger.error(ex); + throw new ObjectNotFoundException("Could not resolve agent", ""); + } + String agentDN = agentPath.getFullDN(); + ldapProps.mUser = agentDN; + ldapProps.mPassword = agentPassword; + + try { + LDAPLookup.createConnection(ldapProps); + return (AgentProxy)getProxyManager().getProxy(mLDAPLookup.getRoleManager().getAgentPath(agentName)); + } catch (Exception ex) { + Logger.error(ex); + throw new InvalidDataException("Could not log in", ""); + } + } + + + /** + * Logs into the LDAP server with the given username and password, and initialises the lookup. + * + * @param agentName - username + * @param agentPassword - password + * @return an AgentProxy on the requested user + * @throws InvalidDataException + */ + static public AgentProxy connect(String agentName, String agentPassword) + throws InvalidDataException + { + + LDAPProperties ldapProps = new LDAPProperties(); + if (ldapProps.mHost!=null && ldapProps.mPort!= null && ldapProps.mLocalPath!=null ) + { + try { + ldapProps.mUser = ""; + ldapProps.mPassword = ""; + mLDAPLookup = new LDAPLookup(ldapProps); + String agentDN = mLDAPLookup.getRoleManager().getAgentPath(agentName).getFullDN(); + + //found agentDN, try to log in with it + ldapProps.mUser = agentDN; + ldapProps.mPassword = agentPassword; + mLDAPLookup = new LDAPLookup(ldapProps); + + // find agent proxy + AgentPath agentPath = mLDAPLookup.getRoleManager().getAgentPath(agentName); + + if (agentPath!=null) + { + setup(); + return (AgentProxy) mProxyManager.getProxy(agentPath); + } + else + { + throw new InvalidDataException("The agentDN " +agentDN+ " is invalid.", ""); + } + } catch (ClusterStorageException e) { + throw new InvalidDataException(Language.translate("Error initialising storage")+Language.translate(". See log."), ""); + } catch (ObjectNotFoundException e) { + throw new InvalidDataException(Language.translate("Invalid username/password"), ""); + } catch (Exception e) { + throw new InvalidDataException(Language.translate("Could not log in")+": "+Language.translate(e.getMessage()), ""); + } + + } + else + { + throw new InvalidDataException("Cannot log in. Some connection properties are not set.", ""); + } + + } + + /** + * Initializes the storage and proxy manager, called during connect. + * + * @throws InvalidDataException + * @throws ClusterStorageException + */ + static private void setup() + throws InvalidDataException, + ClusterStorageException + { + + // Init storages + mStorage = new TransactionManager(); + mProxyManager = new EntityProxyManager(); + + } + + /** + * Shuts down all kernel api objects + */ + public static void close() + { + // run shutdown module scripts + mModules.runScripts("shutdown"); + + // shut down servers if running + if (mCorbaServer != null) + mCorbaServer.close(); + mCorbaServer = null; + if (mHTTPServer != null) + mHTTPServer.stopListening(); + mHTTPServer = null; + + // disconnect from storages + if (mStorage != null) + mStorage.close(); + mStorage = null; + + // disconnect from ldap + if (mLDAPLookup != null) + mLDAPLookup.disconnect(); + mLDAPLookup = null; + + // shut down proxy manager + if (mProxyManager != null) + mProxyManager.shutdown(); + mProxyManager = null; + EntityProxyManager.shutdownServer(); + + // close log consoles + Logger.closeConsole(); + + // finally, destroy the ORB + getORB().destroy(); + } + + static public org.omg.CORBA.ORB getORB() + { + if (mORB == null) + mORB = org.omg.CORBA.ORB.init(new String[0], null); + return mORB; + } + + static public LDAPLookup getLDAPLookup() + { + return mLDAPLookup; + } + + static public CorbaServer getCorbaServer() + { + return mCorbaServer; + } + + static public TransactionManager getStorage() + { + return mStorage; + } + + static public EntityProxyManager getProxyManager() + { + return mProxyManager; + } + + static public String getCentreId() { + return getProperty("LocalCentre"); + } + + static public String getProperty(String propName) { + return getProperty(propName, null); + } + + static public String getProperty(String propName, String defaultValue) { + if (mC2KProps == null) return defaultValue; + return mC2KProps.getProperty(propName, defaultValue); + } + + static public void setProperty(String propName, String propValue) { + if (mC2KProps == null) return; + mC2KProps.put(propName, propValue); + } + + static public Enumeration propertyNames() { + return mC2KProps.propertyNames(); + } + + static public void dumpC2KProps(int logLevel) { + if (!Logger.doLog(logLevel)) return; + Logger.msg(logLevel, "C2K Properties:"); + for (Enumeration e = propertyNames(); e.hasMoreElements();) { + String name = (String) e.nextElement(); + Logger.msg(" "+name+": "+getProperty(name)); + } + } +} + diff --git a/src/main/java/com/c2kernel/process/ItemHTTPBridge.java b/src/main/java/com/c2kernel/process/ItemHTTPBridge.java new file mode 100644 index 0000000..40f48f7 --- /dev/null +++ b/src/main/java/com/c2kernel/process/ItemHTTPBridge.java @@ -0,0 +1,52 @@ +package com.c2kernel.process; + +import java.util.StringTokenizer; + +import com.c2kernel.entity.C2KLocalObject; +import com.c2kernel.utils.CastorXMLUtility; +import com.c2kernel.utils.server.HTTPRequestHandler; + +/* QueryData over HTTP Socket Handler + * Processes an HTTP request consisting of // + * and returns that kernel object as XML + * Currently supports GET requests. + * REVISIT: POST calls Item.request() + */ + +public class ItemHTTPBridge extends HTTPRequestHandler { + + public ItemHTTPBridge() { } + + @Override + public String getName() { + return "Item HTTP Server"; + } + + @Override + public String processRequest() { + System.out.println("ItemHTTPBridge::ProcessRequest()"); + StringTokenizer tok = new StringTokenizer(resource, "?"); + //String itemPath = tok.nextToken(); + String query = tok.nextToken(); + int sysKey = -1; + //Path path = Gateway.getLDAPLookup().; + if (method.equals("GET")) { + try { + //DomainPath domPath = new DomainPath(itemPath); + //EntityPath entityPath = domPath.getEntity(); + + if (sysKey > -1) { + C2KLocalObject response = Gateway.getStorage().get(sysKey, query, null); + return CastorXMLUtility.marshall(response); + } + else + return error("404 Not Found", "The entity "+sysKey+" you requested was not found."); + } + catch (Exception e) { + return error("400 Bad Request", "Usage: GET <path to item>?<path to kernel object>
"+e.getClass().getName()); + } + } + return(super.processRequest()); + } + +} diff --git a/src/main/java/com/c2kernel/process/Module.java b/src/main/java/com/c2kernel/process/Module.java new file mode 100644 index 0000000..e2a4f2e --- /dev/null +++ b/src/main/java/com/c2kernel/process/Module.java @@ -0,0 +1,303 @@ +package com.c2kernel.process; + +import java.io.StringReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Properties; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; + +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; +import org.w3c.dom.Text; +import org.xml.sax.InputSource; + +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.entity.proxy.ItemProxy; +import com.c2kernel.events.Event; +import com.c2kernel.events.History; +import com.c2kernel.lifecycle.instance.predefined.entitycreation.Dependency; +import com.c2kernel.lifecycle.instance.predefined.entitycreation.DependencyMember; +import com.c2kernel.lifecycle.instance.predefined.entitycreation.NewAgent; +import com.c2kernel.lifecycle.instance.predefined.entitycreation.NewItem; +import com.c2kernel.lifecycle.instance.predefined.entitycreation.Property; +import com.c2kernel.lifecycle.instance.stateMachine.States; +import com.c2kernel.lifecycle.instance.stateMachine.Transitions; +import com.c2kernel.lookup.DomainPath; +import com.c2kernel.persistency.outcome.Outcome; +import com.c2kernel.persistency.outcome.Viewpoint; +import com.c2kernel.scripting.ErrorInfo; +import com.c2kernel.scripting.Script; +import com.c2kernel.scripting.ScriptingEngineException; +import com.c2kernel.utils.CastorXMLUtility; +import com.c2kernel.utils.Logger; +import com.c2kernel.utils.Resource; + +public class Module { + + private final String ns, name, desc, version; + private String resURL; + private final ArrayList dependency = new ArrayList(); + private final Properties clientProps = new Properties(); + private final Properties serverProps = new Properties(); + private final HashMap clientScripts = new HashMap(); + private final HashMap serverScripts = new HashMap(); + private final ArrayList imports = new ArrayList(); + private static DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); + private final DocumentBuilder parser; + + static { + dbf.setValidating(false); + dbf.setNamespaceAware(false); + } + + public Module(String moduleXML) throws Exception { + parser = dbf.newDocumentBuilder(); + Document moduleDOM = parser.parse(new InputSource(new StringReader(moduleXML))); + + Element root = (Element)moduleDOM.getElementsByTagName("CristalModule").item(0); + + // Get module metadata + ns = root.getAttribute("ns"); + name = root.getAttribute("name"); + Element info = (Element)moduleDOM.getElementsByTagName("Info").item(0); + desc = ((Text)info.getElementsByTagName("Description").item(0).getFirstChild()).getData(); + version = ((Text)info.getElementsByTagName("Version").item(0).getFirstChild()).getData(); + NodeList nl = info.getElementsByTagName("Dependency"); + for (int i=0; i0) { + resURL = ((Text)nl.item(0).getFirstChild()).getData(); + Resource.addModuleBaseURL(ns, resURL); + } + + // Get config properties + nl = root.getElementsByTagName("Config"); + for (int i=0; i0) { + Element impElem = (Element)nl.item(0); + nl = impElem.getChildNodes(); + for (int i=0; i scripts = isServer?serverScripts:clientScripts; + Script thisScript = scripts.get(event); + if (thisScript == null) return null; + try { + Object result = thisScript.execute(); + if (result instanceof ErrorInfo) + return (ErrorInfo)result; + else + return new ErrorInfo(result.toString()); + } catch (ScriptingEngineException ex) { + Logger.error(ex); + return new ErrorInfo("Error running "+event+" script in module "+ns); + } + } + + public void importAll(ItemProxy serverEntity) { + for (ModuleImport thisImp : imports) { + if (thisImp instanceof ModuleResource) { + ModuleResource thisRes = (ModuleResource)thisImp; + try { + Bootstrap.verifyResource(ns, thisRes.importName, thisRes.resourceType, Resource.getTextResource(ns, thisRes.resourceLocation)); + } catch (Exception ex) { + Logger.error(ex); + } + } + else if (thisImp instanceof ModuleItem) { + ModuleItem thisItem = (ModuleItem)thisImp; + try { + NewItem item = new NewItem(thisItem.importName, "/desc/"+ns, thisItem.workflow); + item.propertyList = thisItem.props; + DomainPath itemPath = new DomainPath(new DomainPath(item.initialPath), item.name); + if (itemPath.exists()) continue; + serverEntity.requestAction( + Gateway.getLDAPLookup().getRoleManager().getAgentPath("system").getSysKey(), + "workflow/predefined/CreateNewItem", + Transitions.DONE, + CastorXMLUtility.marshall(item)); + Logger.msg("Module.importAll() - Created item: "+thisItem.importName); + ItemProxy newProxy = (ItemProxy)Gateway.getProxyManager().getProxy(itemPath); + History hist = new History(newProxy.getSystemKey(), newProxy); + for (String thisView : thisItem.outcomes.keySet()) { + String[] info = thisView.split(":"); + int version = Integer.parseInt(info[1]); + String data = Resource.getTextResource(ns, thisItem.outcomes.get(thisView)); + Event newEvent = hist.addEvent("system", "Admin", Transitions.DONE, "Import", "Import", "Import", States.FINISHED); + Outcome newOutcome = new Outcome(newEvent.getID(), data, info[0], version); + Viewpoint newLastView = new Viewpoint(newProxy.getSystemKey(), info[0], info[2], version, newEvent.getID()); + Gateway.getStorage().put(newProxy.getSystemKey(), newOutcome, newProxy); + Gateway.getStorage().put(newProxy.getSystemKey(), newLastView, newProxy); + } + for (Dependency thisDep : thisItem.deps) { + Gateway.getStorage().put(newProxy.getSystemKey(), thisDep.create(), newProxy); + } + Gateway.getStorage().commit(newProxy); + } catch (Exception ex) { + Logger.error("Error importing item "+thisItem.importName+" from module "+name); + Logger.error(ex); + } + } + else if (thisImp instanceof ModuleAgent) { + ModuleAgent thisAgent = (ModuleAgent)thisImp; + try { + Gateway.getLDAPLookup().getRoleManager().getAgentPath(thisAgent.importName); + Logger.msg(3, "Module.importAll() - User '"+thisAgent.importName+"' found."); + return; + } catch (ObjectNotFoundException ex) { } + Logger.msg("Module.importAll() - User '"+thisAgent.importName+"' not found. Creating."); + + NewAgent agent = new NewAgent(thisAgent.importName, thisAgent.password); + agent.roles = thisAgent.roles; + try { + serverEntity.requestAction( + Gateway.getLDAPLookup().getRoleManager().getAgentPath("system").getSysKey(), + "workflow/predefined/CreateNewAgent", + Transitions.DONE, + CastorXMLUtility.marshall(agent)); + } catch (Exception ex) { + Logger.error("Error importing agent "+thisAgent.importName+" from module "+name); + Logger.error(ex); + } + } + } + } + + public Properties getClientProperties() { + return clientProps; + } + + public Properties getServerProperties() { + return serverProps; + } + + public String getNs() { + return ns; + } + public String getName() { + return name; + } + public String getDesc() { + return desc; + } + public String getVersion() { + return version; + } + public String getResURL() { + return resURL; + } + public ArrayList getDependencies() { + return dependency; + } + public boolean hasDependency(String dep) { + return dependency.contains(dep); + } + + public abstract class ModuleImport { + String importName; + } + + public class ModuleResource extends ModuleImport { + String resourceType; + String resourceLocation; + } + + public class ModuleItem extends ModuleImport { + ArrayList props = new ArrayList(); + HashMap outcomes = new HashMap(); + ArrayList deps = new ArrayList(); + String workflow; + } + + public class ModuleAgent extends ModuleImport { + String password; + ArrayList roles = new ArrayList(); + } +} \ No newline at end of file diff --git a/src/main/java/com/c2kernel/process/ModuleManager.java b/src/main/java/com/c2kernel/process/ModuleManager.java new file mode 100644 index 0000000..e7be9e8 --- /dev/null +++ b/src/main/java/com/c2kernel/process/ModuleManager.java @@ -0,0 +1,82 @@ +package com.c2kernel.process; + +import java.net.URL; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.Properties; + +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.entity.proxy.ItemProxy; +import com.c2kernel.lookup.DomainPath; +import com.c2kernel.utils.FileStringUtility; +import com.c2kernel.utils.Logger; + +public class ModuleManager { + ArrayList modules = new ArrayList(); + Properties props = new Properties(); + boolean isServer; + + public ModuleManager(Enumeration moduleEnum, boolean isServer) { + this.isServer = isServer; + ArrayList loadedModules = new ArrayList(); + while(moduleEnum.hasMoreElements()) { + URL newModuleURL = moduleEnum.nextElement(); + try { + Module newModule = new Module(FileStringUtility.url2String(newModuleURL)); + modules.add(newModule); + loadedModules.add(newModule.getName()); + Properties modProp = isServer?newModule.getServerProperties():newModule.getClientProperties(); + for (Enumeration e = modProp.propertyNames(); e.hasMoreElements();) { + String propName = (String)e.nextElement(); + props.put(propName, modProp.get(propName)); + } + } catch (Exception e) { + Logger.error("Could not load module description from "+newModuleURL); + Logger.error(e); + } + } + + Logger.debug(5, "Checking dependencies"); + boolean depFailed = false; + for (Module thisMod : modules) { + ArrayList deps = thisMod.getDependencies(); + for (String dep : deps) { + if (!loadedModules.contains(dep)) { + Logger.error("UNMET MODULE DEPENDENCY: "+thisMod.getName()+" requires "+dep); + depFailed = true; + } + } + } + if (depFailed) Logger.die("Unmet module dependencies. Cannot continue"); + } + + public String getModuleVersions() { + StringBuffer ver = new StringBuffer(); + for (Module thisMod : modules) { + if (ver.length()>0) ver.append(";"); + ver.append(thisMod.getName()+"("+thisMod.getVersion()+")"); + } + return ver.toString(); + } + + + public Properties getAllModuleProperties() { + return props; + } + + public void runScripts(String event) { + for (Module thisMod : modules) { + thisMod.runScript(event, isServer); + } + } + + public void registerModules() throws ObjectNotFoundException { + ItemProxy serverEntity = (ItemProxy)Gateway.getProxyManager().getProxy(new DomainPath("/servers/"+Gateway.getProperty("ItemServer.name"))); + Logger.debug(3, "Registering modules"); + for (Module thisMod : modules) { + Logger.debug(4, "Registering module "+thisMod.getName()); + thisMod.importAll(serverEntity); + Logger.msg("Module "+thisMod.getName()+" registered"); + } + } +} diff --git a/src/main/java/com/c2kernel/process/StandardClient.java b/src/main/java/com/c2kernel/process/StandardClient.java new file mode 100644 index 0000000..f6f48ed --- /dev/null +++ b/src/main/java/com/c2kernel/process/StandardClient.java @@ -0,0 +1,18 @@ +/************************************************************************** + * StandardClient + * + * $Revision: 1.7 $ + * $Date: 2003/04/25 16:17:12 $ + * + * Copyright (C) 2001 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +package com.c2kernel.process; + +abstract public class StandardClient extends AbstractMain +{ + + //TODO: Auto-update from server + +} diff --git a/src/main/java/com/c2kernel/process/StandardServer.java b/src/main/java/com/c2kernel/process/StandardServer.java new file mode 100644 index 0000000..6804da5 --- /dev/null +++ b/src/main/java/com/c2kernel/process/StandardServer.java @@ -0,0 +1,127 @@ +/************************************************************************** + * StandardServer + * + * $Revision: 1.47 $ + * $Date: 2005/04/28 13:49:43 $ + * + * Copyright (C) 2001 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +package com.c2kernel.process; + +import org.tanukisoftware.wrapper.WrapperListener; +import org.tanukisoftware.wrapper.WrapperManager; + +import com.c2kernel.utils.Logger; + +/************************************************************************** + * Base class for all servers i.e. c2k processes that serve Entities + * + * @author $Author: abranson $ $Date: 2005/04/28 13:49:43 $ + * @version $Revision: 1.47 $ + **************************************************************************/ +public class StandardServer extends AbstractMain implements WrapperListener +{ + protected static StandardServer server; + + + /************************************************************************** + * C2KRootPOA suitable for Factory objects + **************************************************************************/ + + + + + + /************************************************************************** + * void StandardInitalisation( String[] ) + * + * Set-up calls to ORB, POA and Factorys, both optional and required. + **************************************************************************/ + protected void standardInitialisation( String[] args ) + throws Exception + { + // read args and init Gateway + Gateway.init(readC2KArgs(args), true); + + // connect to LDAP as root + Gateway.connect(); + + //start console + Logger.initConsole("ItemServer"); + + //initialize the server objects + Gateway.startServer(); + + Logger.msg(5, "StandardServer::standardInitialisation - complete."); + + } + + + /************************************************************************** + * Sets up and runs and item server + **************************************************************************/ + @Override + public Integer start(String[] args) + { + try + { + //initialise everything + standardInitialisation( args ); + } + catch( Exception ex ) + { + Logger.error(ex); + Logger.die("Startup failed"); + } + return null; + } + + public static void main(String[] args) { + server = new StandardServer(); + AbstractMain.runningAsWrapper = true; + WrapperManager.start( server, args ); + } + + /** + * + */ + @Override + public void controlEvent(int event) { + if (WrapperManager.isControlledByNativeWrapper()) { + // The Wrapper will take care of this event + } else { + // We are not being controlled by the Wrapper, so + // handle the event ourselves. + if ((event == WrapperManager.WRAPPER_CTRL_C_EVENT) || + (event == WrapperManager.WRAPPER_CTRL_CLOSE_EVENT) || + (event == WrapperManager.WRAPPER_CTRL_SHUTDOWN_EVENT)){ + WrapperManager.stop(0); + } + } + + } + + /************************************************************************** + * Closes all listeners, quits the VM. + * This method should be called to kill the server process + * e.g. from the NT service wrapper + **************************************************************************/ + @Override + public int stop(int arg0) { + try + { + Gateway.close(); + } + catch( Exception ex ) + { + Logger.error(ex); + return 1; + } + + Logger.msg("StandardServer::shutdown - complete. "); + return 0; + } + +} diff --git a/src/main/java/com/c2kernel/process/UserCodeProcess.java b/src/main/java/com/c2kernel/process/UserCodeProcess.java new file mode 100644 index 0000000..7779802 --- /dev/null +++ b/src/main/java/com/c2kernel/process/UserCodeProcess.java @@ -0,0 +1,234 @@ +package com.c2kernel.process; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; + +import com.c2kernel.common.InvalidDataException; +import com.c2kernel.common.InvalidTransitionException; +import com.c2kernel.entity.C2KLocalObject; +import com.c2kernel.entity.agent.Job; +import com.c2kernel.entity.proxy.AgentProxy; +import com.c2kernel.entity.proxy.EntityProxyObserver; +import com.c2kernel.entity.proxy.MemberSubscription; +import com.c2kernel.lifecycle.instance.stateMachine.Transitions; +import com.c2kernel.persistency.ClusterStorage; +import com.c2kernel.utils.Logger; + +/************************************************************************** + * + * $Revision: 1.31 $ + * $Date: 2004/10/21 08:02:19 $ + * + * Copyright (C) 2003 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ +public class UserCodeProcess extends StandardClient implements EntityProxyObserver, Runnable { + protected AgentProxy agent; + static boolean active = true; + ArrayList ignoredPaths = new ArrayList(); + HashMap jobs; + + public UserCodeProcess(String agentName, String agentPass) { + // login - try for a while in case server hasn't imported our user yet + for (int i=1;i<6;i++) { + try { + Logger.msg("Login attempt "+i+" of 5"); + agent = Gateway.connect(agentName, agentPass); + break; + } catch (InvalidDataException ex) { + Logger.error("Could not log in."); + Logger.error(ex); + try { + Thread.sleep(5000); + } catch (InterruptedException ex2) { } + } + } + System.out.println(getDesc()+" initialised for " + agentName); + } + + @Override + public void run() { + Thread.currentThread().setName("Usercode Process"); + jobs = new HashMap(); + // subscribe to job list + agent.subscribe(new MemberSubscription(this, ClusterStorage.JOB, true)); + while (active) { + Job thisJob = null; + synchronized (jobs) { + if (jobs.size() > 0) { + thisJob = getJob(jobs, Transitions.COMPLETE); + if (thisJob == null) + thisJob = getJob(jobs, Transitions.START); + if (thisJob == null) + thisJob = getJob(jobs, Transitions.SUSPEND); + if (thisJob == null) + thisJob = getJob(jobs, Transitions.RESUME); + + if (thisJob == null) { + Logger.error("No supported jobs, but joblist is not empty! Discarding remaining jobs"); + jobs.clear(); + } + else + jobs.remove(ClusterStorage.getPath(thisJob)); + } + } + + if (thisJob != null) { + String jobKey = thisJob.getItemSysKey()+":"+thisJob.getStepPath(); + try { + if (thisJob.getPossibleTransition()==Transitions.START) { + Logger.msg(5, "Testing start conditions"); + boolean start = assessStartConditions(thisJob); + if (start) { + Logger.msg(5, "Attempting to start"); + agent.execute(thisJob); + } + else { + Logger.msg(5, "Start conditions failed "+thisJob.getStepName()+" in "+thisJob.getItemSysKey()); + } + } + else if (thisJob.getPossibleTransition()==Transitions.COMPLETE) { + Logger.msg(5, "Executing logic"); + runUCLogic(thisJob); + if (ignoredPaths.contains(jobKey)) + ignoredPaths.remove(jobKey); + } + else if (thisJob.getPossibleTransition()==Transitions.SUSPEND) { + if (ignoredPaths.contains(jobKey)) + agent.execute(thisJob); + } + else if (thisJob.getPossibleTransition()==Transitions.RESUME) { + if (!ignoredPaths.contains(jobKey)) + agent.execute(thisJob); + } + } catch (InvalidTransitionException ex) { + // must have already been done by someone else - ignore + } catch (Throwable ex) { + Logger.error("Error executing "+Transitions.getTransitionName(thisJob.getPossibleTransition())+" job:"); + Logger.error(ex); + ignoredPaths.add(jobKey); + } + } + try { + synchronized (jobs) { + if (jobs.size() == 0) { + Logger.msg("Sleeping"); + while (active && jobs.size() == 0) + jobs.wait(2000); + } + } + } catch (InterruptedException ex) { } + } + + // shut down + try + { + Gateway.close(); + } + catch( Exception ex ) + { + Logger.error(ex); + } + } + + private static Job getJob(HashMap jobs, int transition) { + for (C2KLocalObject c2kLocalObject : jobs.values()) { + Job thisJob = (Job)c2kLocalObject; + if (thisJob.getPossibleTransition() == transition) { + Logger.msg(1,"================================================================="); + Logger.msg(1, "Got "+Transitions.getTransitionName(transition)+" job for "+thisJob.getStepName()+" in "+thisJob.getItemSysKey()); + return thisJob; + } + } + return null; + } + + public boolean assessStartConditions(Job job) { + // default implementation - has no start conditions. + return true; + } + + public void runUCLogic(Job job) throws Exception { + // default implementation - the agent will execute any scripts defined when we execute + agent.execute(job); + } + + + /** + * Receives job from the AgentProxy. Reactivates thread if sleeping. + */ + @Override + public void add(Job contents) { + synchronized(jobs) { + Logger.msg(7, "Adding "+ClusterStorage.getPath(contents)); + jobs.put(ClusterStorage.getPath(contents), contents); + jobs.notify(); + } + + } + + @Override + public void control(String control, String msg) { + if (control == MemberSubscription.ERROR) + Logger.error("Error in job subscription: "+msg); + } + + /** + * Removes job removal notification from the AgentProxy. + */ + @Override + public void remove(String id) { + synchronized(jobs) { + Logger.msg(7, "Deleting "+id); + jobs.remove(id); + } + } + + public static UserCodeProcess getInstance() throws UnknownHostException { + return new UserCodeProcess(InetAddress.getLocalHost().getHostName(), "uc"); + } + + static public void main(String[] args) + { + int status = 0; + + try + { + Gateway.init(readC2KArgs(args), false); + UserCodeProcess proc = getInstance(); + new Thread(proc).start(); + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + shutdown(); + } + })); + } + catch( Exception ex ) + { + Logger.error(ex); + + try + { + Gateway.close(); + } + catch(Exception ex1) + { + Logger.error(ex1); + } + status = 1; + System.exit(status); + } + } + + public String getDesc() { + return("Usercode Process"); + } + + public static void shutdown() { + active = false; + } + +} -- cgit v1.2.3