From b086f57f56bf0eb9dab9cf321a0f69aaaae84347 Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Wed, 30 May 2012 08:37:45 +0200 Subject: Initial Maven Conversion --- source/com/c2kernel/process/UserCodeProcess.java | 234 ----------------------- 1 file changed, 234 deletions(-) delete mode 100644 source/com/c2kernel/process/UserCodeProcess.java (limited to 'source/com/c2kernel/process/UserCodeProcess.java') diff --git a/source/com/c2kernel/process/UserCodeProcess.java b/source/com/c2kernel/process/UserCodeProcess.java deleted file mode 100644 index 7779802..0000000 --- a/source/com/c2kernel/process/UserCodeProcess.java +++ /dev/null @@ -1,234 +0,0 @@ -package com.c2kernel.process; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.HashMap; - -import com.c2kernel.common.InvalidDataException; -import com.c2kernel.common.InvalidTransitionException; -import com.c2kernel.entity.C2KLocalObject; -import com.c2kernel.entity.agent.Job; -import com.c2kernel.entity.proxy.AgentProxy; -import com.c2kernel.entity.proxy.EntityProxyObserver; -import com.c2kernel.entity.proxy.MemberSubscription; -import com.c2kernel.lifecycle.instance.stateMachine.Transitions; -import com.c2kernel.persistency.ClusterStorage; -import com.c2kernel.utils.Logger; - -/************************************************************************** - * - * $Revision: 1.31 $ - * $Date: 2004/10/21 08:02:19 $ - * - * Copyright (C) 2003 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ -public class UserCodeProcess extends StandardClient implements EntityProxyObserver, Runnable { - protected AgentProxy agent; - static boolean active = true; - ArrayList ignoredPaths = new ArrayList(); - HashMap jobs; - - public UserCodeProcess(String agentName, String agentPass) { - // login - try for a while in case server hasn't imported our user yet - for (int i=1;i<6;i++) { - try { - Logger.msg("Login attempt "+i+" of 5"); - agent = Gateway.connect(agentName, agentPass); - break; - } catch (InvalidDataException ex) { - Logger.error("Could not log in."); - Logger.error(ex); - try { - Thread.sleep(5000); - } catch (InterruptedException ex2) { } - } - } - System.out.println(getDesc()+" initialised for " + agentName); - } - - @Override - public void run() { - Thread.currentThread().setName("Usercode Process"); - jobs = new HashMap(); - // subscribe to job list - agent.subscribe(new MemberSubscription(this, ClusterStorage.JOB, true)); - while (active) { - Job thisJob = null; - synchronized (jobs) { - if (jobs.size() > 0) { - thisJob = getJob(jobs, Transitions.COMPLETE); - if (thisJob == null) - thisJob = getJob(jobs, Transitions.START); - if (thisJob == null) - thisJob = getJob(jobs, Transitions.SUSPEND); - if (thisJob == null) - thisJob = getJob(jobs, Transitions.RESUME); - - if (thisJob == null) { - Logger.error("No supported jobs, but joblist is not empty! Discarding remaining jobs"); - jobs.clear(); - } - else - jobs.remove(ClusterStorage.getPath(thisJob)); - } - } - - if (thisJob != null) { - String jobKey = thisJob.getItemSysKey()+":"+thisJob.getStepPath(); - try { - if (thisJob.getPossibleTransition()==Transitions.START) { - Logger.msg(5, "Testing start conditions"); - boolean start = assessStartConditions(thisJob); - if (start) { - Logger.msg(5, "Attempting to start"); - agent.execute(thisJob); - } - else { - Logger.msg(5, "Start conditions failed "+thisJob.getStepName()+" in "+thisJob.getItemSysKey()); - } - } - else if (thisJob.getPossibleTransition()==Transitions.COMPLETE) { - Logger.msg(5, "Executing logic"); - runUCLogic(thisJob); - if (ignoredPaths.contains(jobKey)) - ignoredPaths.remove(jobKey); - } - else if (thisJob.getPossibleTransition()==Transitions.SUSPEND) { - if (ignoredPaths.contains(jobKey)) - agent.execute(thisJob); - } - else if (thisJob.getPossibleTransition()==Transitions.RESUME) { - if (!ignoredPaths.contains(jobKey)) - agent.execute(thisJob); - } - } catch (InvalidTransitionException ex) { - // must have already been done by someone else - ignore - } catch (Throwable ex) { - Logger.error("Error executing "+Transitions.getTransitionName(thisJob.getPossibleTransition())+" job:"); - Logger.error(ex); - ignoredPaths.add(jobKey); - } - } - try { - synchronized (jobs) { - if (jobs.size() == 0) { - Logger.msg("Sleeping"); - while (active && jobs.size() == 0) - jobs.wait(2000); - } - } - } catch (InterruptedException ex) { } - } - - // shut down - try - { - Gateway.close(); - } - catch( Exception ex ) - { - Logger.error(ex); - } - } - - private static Job getJob(HashMap jobs, int transition) { - for (C2KLocalObject c2kLocalObject : jobs.values()) { - Job thisJob = (Job)c2kLocalObject; - if (thisJob.getPossibleTransition() == transition) { - Logger.msg(1,"================================================================="); - Logger.msg(1, "Got "+Transitions.getTransitionName(transition)+" job for "+thisJob.getStepName()+" in "+thisJob.getItemSysKey()); - return thisJob; - } - } - return null; - } - - public boolean assessStartConditions(Job job) { - // default implementation - has no start conditions. - return true; - } - - public void runUCLogic(Job job) throws Exception { - // default implementation - the agent will execute any scripts defined when we execute - agent.execute(job); - } - - - /** - * Receives job from the AgentProxy. Reactivates thread if sleeping. - */ - @Override - public void add(Job contents) { - synchronized(jobs) { - Logger.msg(7, "Adding "+ClusterStorage.getPath(contents)); - jobs.put(ClusterStorage.getPath(contents), contents); - jobs.notify(); - } - - } - - @Override - public void control(String control, String msg) { - if (control == MemberSubscription.ERROR) - Logger.error("Error in job subscription: "+msg); - } - - /** - * Removes job removal notification from the AgentProxy. - */ - @Override - public void remove(String id) { - synchronized(jobs) { - Logger.msg(7, "Deleting "+id); - jobs.remove(id); - } - } - - public static UserCodeProcess getInstance() throws UnknownHostException { - return new UserCodeProcess(InetAddress.getLocalHost().getHostName(), "uc"); - } - - static public void main(String[] args) - { - int status = 0; - - try - { - Gateway.init(readC2KArgs(args), false); - UserCodeProcess proc = getInstance(); - new Thread(proc).start(); - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - shutdown(); - } - })); - } - catch( Exception ex ) - { - Logger.error(ex); - - try - { - Gateway.close(); - } - catch(Exception ex1) - { - Logger.error(ex1); - } - status = 1; - System.exit(status); - } - } - - public String getDesc() { - return("Usercode Process"); - } - - public static void shutdown() { - active = false; - } - -} -- cgit v1.2.3