From 0ed2c1124cf1b9e49a2ec1fa0126a8df09f9e758 Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Tue, 7 Oct 2014 09:18:11 +0200 Subject: Repackage to org.cristalise --- .../java/com/c2kernel/process/UserCodeProcess.java | 275 --------------------- 1 file changed, 275 deletions(-) delete mode 100644 src/main/java/com/c2kernel/process/UserCodeProcess.java (limited to 'src/main/java/com/c2kernel/process/UserCodeProcess.java') diff --git a/src/main/java/com/c2kernel/process/UserCodeProcess.java b/src/main/java/com/c2kernel/process/UserCodeProcess.java deleted file mode 100644 index bc7e23e..0000000 --- a/src/main/java/com/c2kernel/process/UserCodeProcess.java +++ /dev/null @@ -1,275 +0,0 @@ -/** - * This file is part of the CRISTAL-iSE kernel. - * Copyright (c) 2001-2014 The CRISTAL Consortium. All rights reserved. - * - * This library is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as published - * by the Free Software Foundation; either version 3 of the License, or (at - * your option) any later version. - * - * This library is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public - * License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this library; if not, write to the Free Software Foundation, - * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. - * - * http://www.fsf.org/licensing/licenses/lgpl.html - */ -package com.c2kernel.process; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.HashMap; - -import com.c2kernel.common.InvalidTransitionException; -import com.c2kernel.entity.C2KLocalObject; -import com.c2kernel.entity.agent.Job; -import com.c2kernel.entity.proxy.AgentProxy; -import com.c2kernel.entity.proxy.MemberSubscription; -import com.c2kernel.entity.proxy.ProxyObserver; -import com.c2kernel.persistency.ClusterStorage; -import com.c2kernel.scripting.ErrorInfo; -import com.c2kernel.scripting.ScriptErrorException; -import com.c2kernel.utils.Logger; - -/************************************************************************** - * - * $Revision: 1.31 $ - * $Date: 2004/10/21 08:02:19 $ - * - * Copyright (C) 2003 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ -public class UserCodeProcess extends StandardClient implements ProxyObserver, Runnable { - - // Default state machine transitions - private static final int START = 1; - private static final int COMPLETE = 2; - private static final int SUSPEND = 3; - private static final int RESUME = 4; - - protected AgentProxy agent; - static boolean active = true; - ArrayList ignoredPaths = new ArrayList(); - HashMap errors = new HashMap(); - final HashMap jobs = new HashMap(); - - public UserCodeProcess(String agentName, String agentPass, String resource) { - // login - try for a while in case server hasn't imported our user yet - for (int i=1;i<6;i++) { - try { - Logger.msg("Login attempt "+i+" of 5"); - agent = Gateway.connect(agentName, agentPass, resource); - break; - } catch (Exception ex) { - Logger.error("Could not log in."); - Logger.error(ex); - try { - Thread.sleep(5000); - } catch (InterruptedException ex2) { } - } - } - System.out.println(getDesc()+" initialised for " + agentName); - } - - @Override - public void run() { - Thread.currentThread().setName("Usercode Process"); - // subscribe to job list - agent.subscribe(new MemberSubscription(this, ClusterStorage.JOB, true)); - while (active) { - Job thisJob = null; - synchronized (jobs) { - if (jobs.size() > 0) { - thisJob = getJob(jobs, COMPLETE); - if (thisJob == null) - thisJob = getJob(jobs, START); - if (thisJob == null) - thisJob = getJob(jobs, SUSPEND); - if (thisJob == null) - thisJob = getJob(jobs, RESUME); - - if (thisJob == null) { - Logger.error("No supported jobs, but joblist is not empty! Discarding remaining jobs"); - jobs.clear(); - } - else - jobs.remove(ClusterStorage.getPath(thisJob)); - } - } - - if (thisJob != null) { - String jobKey = thisJob.getItemPath()+":"+thisJob.getStepPath(); - int transitionId = thisJob.getTransition().getId(); - try { - if (transitionId==START) { - Logger.msg(5, "Testing start conditions"); - boolean start = assessStartConditions(thisJob); - if (start) { - Logger.msg(5, "Attempting to start"); - agent.execute(thisJob); - } - else { - Logger.msg(5, "Start conditions failed "+thisJob.getStepName()+" in "+thisJob.getItemPath()); - } - } - else if (transitionId==COMPLETE) { - Logger.msg(5, "Executing logic"); - runUCLogic(thisJob); - if (ignoredPaths.contains(jobKey)) - ignoredPaths.remove(jobKey); - } - else if (transitionId==SUSPEND) { - if (ignoredPaths.contains(jobKey)) { - if (errors.containsKey(jobKey)) { - thisJob.setOutcome(Gateway.getMarshaller().marshall(errors.get(jobKey))); - errors.remove(jobKey); - } - agent.execute(thisJob); - } - } - else if (transitionId==RESUME) { - if (!ignoredPaths.contains(jobKey)) - agent.execute(thisJob); - } - } catch (ScriptErrorException ex) { - errors.put(jobKey, ex.getErrors()); - ignoredPaths.add(jobKey); - } catch (InvalidTransitionException ex) { - // must have already been done by someone else - ignore - } catch (Throwable ex) { - Logger.error("Error executing "+thisJob.getTransition().getName()+" job:"); - Logger.error(ex); - ErrorInfo ei = new ErrorInfo(); - ei.setFatal(); - ei.addError(ex.getClass().getSimpleName()); - ei.addError(ex.getMessage()); - errors.put(jobKey, ei); - ignoredPaths.add(jobKey); - } - } - try { - synchronized (jobs) { - if (jobs.size() == 0) { - Logger.msg("Sleeping"); - while (active && jobs.size() == 0) - jobs.wait(2000); - } - } - } catch (InterruptedException ex) { } - } - - // shut down - try - { - Gateway.close(); - } - catch( Exception ex ) - { - Logger.error(ex); - } - } - - private static Job getJob(HashMap jobs, int transition) { - for (C2KLocalObject c2kLocalObject : jobs.values()) { - Job thisJob = (Job)c2kLocalObject; - if (thisJob.getTransition().getId() == transition) { - Logger.msg(1,"================================================================="); - Logger.msg(1, "Got "+thisJob.getTransition().getName()+" job for "+thisJob.getStepName()+" in "+thisJob.getItemPath()); - return thisJob; - } - } - return null; - } - - public boolean assessStartConditions(Job job) { - // default implementation - has no start conditions. - return true; - } - - public void runUCLogic(Job job) throws Exception { - // default implementation - the agent will execute any scripts defined when we execute - agent.execute(job); - } - - - /** - * Receives job from the AgentProxy. Reactivates thread if sleeping. - */ - @Override - public void add(Job contents) { - synchronized(jobs) { - Logger.msg(7, "Adding "+ClusterStorage.getPath(contents)); - jobs.put(ClusterStorage.getPath(contents), contents); - jobs.notify(); - } - - } - - @Override - public void control(String control, String msg) { - if (MemberSubscription.ERROR.equals(control)) - Logger.error("Error in job subscription: "+msg); - } - - /** - * Removes job removal notification from the AgentProxy. - */ - @Override - public void remove(String id) { - synchronized(jobs) { - Logger.msg(7, "Deleting "+id); - jobs.remove(id); - } - } - - public static UserCodeProcess getInstance() throws UnknownHostException { - return new UserCodeProcess(InetAddress.getLocalHost().getHostName(), "uc", Gateway.getProperties().getString("AuthResource", "Cristal")); - } - - static public void main(String[] args) - { - int status = 0; - - try - { - Gateway.init(readC2KArgs(args)); - UserCodeProcess proc = getInstance(); - new Thread(proc).start(); - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - shutdown(); - } - })); - } - catch( Exception ex ) - { - Logger.error(ex); - - try - { - Gateway.close(); - } - catch(Exception ex1) - { - Logger.error(ex1); - } - status = 1; - System.exit(status); - } - } - - public String getDesc() { - return("Usercode Process"); - } - - public static void shutdown() { - active = false; - } - -} -- cgit v1.2.3