diff options
| author | abranson <andrew.branson@cern.ch> | 2011-08-04 00:42:34 +0200 |
|---|---|---|
| committer | abranson <andrew.branson@cern.ch> | 2011-08-04 00:42:34 +0200 |
| commit | 0ec8481c10cd8277d84c7c1a785483a0a739e5a0 (patch) | |
| tree | 5f6e5d9ae75193e67e6f3b3dfa488960c5cde1d5 /source/com/c2kernel/process/UserCodeProcess.java | |
| parent | 036cbdba66f804743c4c838ed598d6972c4b3e17 (diff) | |
More code cleanup:
Refactored Entity Proxy Subscription to handle generics better
Rewrote RemoteMap to use TreeMap instead of the internal array for
order. It now sorts its keys by number if they parse, else as strings.
Removed a no-longer-in-progress outcome form class
Diffstat (limited to 'source/com/c2kernel/process/UserCodeProcess.java')
| -rw-r--r-- | source/com/c2kernel/process/UserCodeProcess.java | 71 |
1 files changed, 38 insertions, 33 deletions
diff --git a/source/com/c2kernel/process/UserCodeProcess.java b/source/com/c2kernel/process/UserCodeProcess.java index 18fed06..9c2deab 100644 --- a/source/com/c2kernel/process/UserCodeProcess.java +++ b/source/com/c2kernel/process/UserCodeProcess.java @@ -4,7 +4,6 @@ import java.net.InetAddress; import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import com.c2kernel.common.InvalidDataException;
import com.c2kernel.common.InvalidTransitionException;
@@ -12,7 +11,7 @@ import com.c2kernel.entity.C2KLocalObject; import com.c2kernel.entity.agent.Job;
import com.c2kernel.entity.proxy.AgentProxy;
import com.c2kernel.entity.proxy.EntityProxyObserver;
-import com.c2kernel.entity.proxy.MemberControl;
+import com.c2kernel.entity.proxy.MemberSubscription;
import com.c2kernel.lifecycle.instance.stateMachine.Transitions;
import com.c2kernel.persistency.ClusterStorage;
import com.c2kernel.utils.Logger;
@@ -25,12 +24,12 @@ import com.c2kernel.utils.Logger; * Copyright (C) 2003 CERN - European Organization for Nuclear Research
* All rights reserved.
**************************************************************************/
-public class UserCodeProcess extends StandardClient implements EntityProxyObserver, Runnable {
+public class UserCodeProcess extends StandardClient implements EntityProxyObserver<Job>, Runnable {
protected AgentProxy agent;
static boolean active = true;
ArrayList<String> ignoredPaths = new ArrayList<String>();
HashMap<String, C2KLocalObject> jobs;
-
+
public UserCodeProcess(String agentName, String agentPass) {
// login - try for a while in case server hasn't imported our user yet
for (int i=1;i<6;i++) {
@@ -48,12 +47,13 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv }
System.out.println(getDesc()+" initialised for " + agentName);
}
-
- public void run() {
+
+ @Override
+ public void run() {
Thread.currentThread().setName("Usercode Process");
jobs = new HashMap<String, C2KLocalObject>();
// subscribe to job list
- agent.subscribe(this, ClusterStorage.JOB, true);
+ agent.subscribe(new MemberSubscription<Job>(this, ClusterStorage.JOB, true));
while (active) {
Job thisJob = null;
synchronized (jobs) {
@@ -62,10 +62,10 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv if (thisJob == null)
thisJob = getJob(jobs, Transitions.START);
if (thisJob == null)
- thisJob = getJob(jobs, Transitions.SUSPEND);
+ thisJob = getJob(jobs, Transitions.SUSPEND);
if (thisJob == null)
thisJob = getJob(jobs, Transitions.RESUME);
-
+
if (thisJob == null) {
Logger.error("No supported jobs, but joblist is not empty! Discarding remaining jobs");
jobs.clear();
@@ -74,7 +74,7 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv jobs.remove(ClusterStorage.getPath(thisJob));
}
}
-
+
if (thisJob != null) {
String jobKey = thisJob.getItemSysKey()+":"+thisJob.getStepPath();
try {
@@ -82,8 +82,8 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv Logger.msg(5, "Testing start conditions");
boolean start = assessStartConditions(thisJob);
if (start) {
- Logger.msg(5, "Attempting to start");
- agent.execute(thisJob);
+ Logger.msg(5, "Attempting to start");
+ agent.execute(thisJob);
}
else {
Logger.msg(5, "Start conditions failed "+thisJob.getStepName()+" in "+thisJob.getItemSysKey());
@@ -103,7 +103,7 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv if (!ignoredPaths.contains(jobKey))
agent.execute(thisJob);
}
- } catch (InvalidTransitionException ex) {
+ } catch (InvalidTransitionException ex) {
// must have already been done by someone else - ignore
} catch (Throwable ex) {
Logger.error("Error executing "+Transitions.getTransitionName(thisJob.getPossibleTransition())+" job:");
@@ -121,7 +121,7 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv }
} catch (InterruptedException ex) { }
}
-
+
// shut down
try
{
@@ -132,10 +132,10 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv Logger.error(ex);
}
}
-
- private Job getJob(HashMap jobs, int transition) {
- for (Iterator list = jobs.values().iterator();list.hasNext();) {
- Job thisJob = (Job)list.next();
+
+ private static Job getJob(HashMap<String, C2KLocalObject> jobs, int transition) {
+ for (C2KLocalObject c2kLocalObject : jobs.values()) {
+ Job thisJob = (Job)c2kLocalObject;
if (thisJob.getPossibleTransition() == transition) {
Logger.msg(1,"=================================================================");
Logger.msg(1, "Got "+Transitions.getTransitionName(transition)+" job for "+thisJob.getStepName()+" in "+thisJob.getItemSysKey());
@@ -144,27 +144,23 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv }
return null;
}
-
+
public boolean assessStartConditions(Job job) {
- // default implementation - has no start conditions.
+ // default implementation - has no start conditions.
return true;
}
-
+
public void runUCLogic(Job job) throws Exception {
// default implementation - the agent will execute any scripts defined when we execute
agent.execute(job);
}
-
-
+
+
/**
* Receives job from the AgentProxy. Reactivates thread if sleeping.
*/
- public void add(C2KLocalObject contents) {
- if (contents instanceof MemberControl) {
- MemberControl ctrl = (MemberControl)contents;
- Logger.msg(5,ctrl.toString());
- }
- else
+ @Override
+ public void add(Job contents) {
synchronized(jobs) {
Logger.msg(7, "Adding "+ClusterStorage.getPath(contents));
jobs.put(ClusterStorage.getPath(contents), contents);
@@ -172,11 +168,18 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv }
}
+
+ @Override
+ public void control(String control, String msg) {
+ if (control == MemberSubscription.ERROR)
+ Logger.error("Error in job subscription: "+msg);
+ }
/**
* Removes job removal notification from the AgentProxy.
*/
- public void remove(String id) {
+ @Override
+ public void remove(String id) {
synchronized(jobs) {
Logger.msg(7, "Deleting "+id);
jobs.remove(id);
@@ -186,7 +189,7 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv public static UserCodeProcess getInstance() throws UnknownHostException {
return new UserCodeProcess(InetAddress.getLocalHost().getHostName(), "uc");
}
-
+
static public void main(String[] args)
{
int status = 0;
@@ -197,7 +200,8 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv UserCodeProcess proc = getInstance();
new Thread(proc).start();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- public void run() {
+ @Override
+ public void run() {
shutdown();
}
}));
@@ -218,7 +222,7 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv System.exit(status);
}
}
-
+
public String getDesc() {
return("Usercode Process");
}
@@ -226,4 +230,5 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv public static void shutdown() {
active = false;
}
+
}
|
