summaryrefslogtreecommitdiff
path: root/source/com/c2kernel/process/UserCodeProcess.java
diff options
context:
space:
mode:
authorAndrew Branson <andrew.branson@cern.ch>2012-05-30 08:37:45 +0200
committerAndrew Branson <andrew.branson@cern.ch>2012-05-30 08:37:45 +0200
commitb086f57f56bf0eb9dab9cf321a0f69aaaae84347 (patch)
tree8e6e26e8b7eed6abad7a17b093bdbb55c5e6b1ba /source/com/c2kernel/process/UserCodeProcess.java
parent22088ae8d2d5ff390518dbe1c4372325ffb3a647 (diff)
Initial Maven Conversion
Diffstat (limited to 'source/com/c2kernel/process/UserCodeProcess.java')
-rw-r--r--source/com/c2kernel/process/UserCodeProcess.java234
1 files changed, 0 insertions, 234 deletions
diff --git a/source/com/c2kernel/process/UserCodeProcess.java b/source/com/c2kernel/process/UserCodeProcess.java
deleted file mode 100644
index 7779802..0000000
--- a/source/com/c2kernel/process/UserCodeProcess.java
+++ /dev/null
@@ -1,234 +0,0 @@
-package com.c2kernel.process;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import com.c2kernel.common.InvalidDataException;
-import com.c2kernel.common.InvalidTransitionException;
-import com.c2kernel.entity.C2KLocalObject;
-import com.c2kernel.entity.agent.Job;
-import com.c2kernel.entity.proxy.AgentProxy;
-import com.c2kernel.entity.proxy.EntityProxyObserver;
-import com.c2kernel.entity.proxy.MemberSubscription;
-import com.c2kernel.lifecycle.instance.stateMachine.Transitions;
-import com.c2kernel.persistency.ClusterStorage;
-import com.c2kernel.utils.Logger;
-
-/**************************************************************************
- *
- * $Revision: 1.31 $
- * $Date: 2004/10/21 08:02:19 $
- *
- * Copyright (C) 2003 CERN - European Organization for Nuclear Research
- * All rights reserved.
- **************************************************************************/
-public class UserCodeProcess extends StandardClient implements EntityProxyObserver<Job>, Runnable {
- protected AgentProxy agent;
- static boolean active = true;
- ArrayList<String> ignoredPaths = new ArrayList<String>();
- HashMap<String, C2KLocalObject> jobs;
-
- public UserCodeProcess(String agentName, String agentPass) {
- // login - try for a while in case server hasn't imported our user yet
- for (int i=1;i<6;i++) {
- try {
- Logger.msg("Login attempt "+i+" of 5");
- agent = Gateway.connect(agentName, agentPass);
- break;
- } catch (InvalidDataException ex) {
- Logger.error("Could not log in.");
- Logger.error(ex);
- try {
- Thread.sleep(5000);
- } catch (InterruptedException ex2) { }
- }
- }
- System.out.println(getDesc()+" initialised for " + agentName);
- }
-
- @Override
- public void run() {
- Thread.currentThread().setName("Usercode Process");
- jobs = new HashMap<String, C2KLocalObject>();
- // subscribe to job list
- agent.subscribe(new MemberSubscription<Job>(this, ClusterStorage.JOB, true));
- while (active) {
- Job thisJob = null;
- synchronized (jobs) {
- if (jobs.size() > 0) {
- thisJob = getJob(jobs, Transitions.COMPLETE);
- if (thisJob == null)
- thisJob = getJob(jobs, Transitions.START);
- if (thisJob == null)
- thisJob = getJob(jobs, Transitions.SUSPEND);
- if (thisJob == null)
- thisJob = getJob(jobs, Transitions.RESUME);
-
- if (thisJob == null) {
- Logger.error("No supported jobs, but joblist is not empty! Discarding remaining jobs");
- jobs.clear();
- }
- else
- jobs.remove(ClusterStorage.getPath(thisJob));
- }
- }
-
- if (thisJob != null) {
- String jobKey = thisJob.getItemSysKey()+":"+thisJob.getStepPath();
- try {
- if (thisJob.getPossibleTransition()==Transitions.START) {
- Logger.msg(5, "Testing start conditions");
- boolean start = assessStartConditions(thisJob);
- if (start) {
- Logger.msg(5, "Attempting to start");
- agent.execute(thisJob);
- }
- else {
- Logger.msg(5, "Start conditions failed "+thisJob.getStepName()+" in "+thisJob.getItemSysKey());
- }
- }
- else if (thisJob.getPossibleTransition()==Transitions.COMPLETE) {
- Logger.msg(5, "Executing logic");
- runUCLogic(thisJob);
- if (ignoredPaths.contains(jobKey))
- ignoredPaths.remove(jobKey);
- }
- else if (thisJob.getPossibleTransition()==Transitions.SUSPEND) {
- if (ignoredPaths.contains(jobKey))
- agent.execute(thisJob);
- }
- else if (thisJob.getPossibleTransition()==Transitions.RESUME) {
- if (!ignoredPaths.contains(jobKey))
- agent.execute(thisJob);
- }
- } catch (InvalidTransitionException ex) {
- // must have already been done by someone else - ignore
- } catch (Throwable ex) {
- Logger.error("Error executing "+Transitions.getTransitionName(thisJob.getPossibleTransition())+" job:");
- Logger.error(ex);
- ignoredPaths.add(jobKey);
- }
- }
- try {
- synchronized (jobs) {
- if (jobs.size() == 0) {
- Logger.msg("Sleeping");
- while (active && jobs.size() == 0)
- jobs.wait(2000);
- }
- }
- } catch (InterruptedException ex) { }
- }
-
- // shut down
- try
- {
- Gateway.close();
- }
- catch( Exception ex )
- {
- Logger.error(ex);
- }
- }
-
- private static Job getJob(HashMap<String, C2KLocalObject> jobs, int transition) {
- for (C2KLocalObject c2kLocalObject : jobs.values()) {
- Job thisJob = (Job)c2kLocalObject;
- if (thisJob.getPossibleTransition() == transition) {
- Logger.msg(1,"=================================================================");
- Logger.msg(1, "Got "+Transitions.getTransitionName(transition)+" job for "+thisJob.getStepName()+" in "+thisJob.getItemSysKey());
- return thisJob;
- }
- }
- return null;
- }
-
- public boolean assessStartConditions(Job job) {
- // default implementation - has no start conditions.
- return true;
- }
-
- public void runUCLogic(Job job) throws Exception {
- // default implementation - the agent will execute any scripts defined when we execute
- agent.execute(job);
- }
-
-
- /**
- * Receives job from the AgentProxy. Reactivates thread if sleeping.
- */
- @Override
- public void add(Job contents) {
- synchronized(jobs) {
- Logger.msg(7, "Adding "+ClusterStorage.getPath(contents));
- jobs.put(ClusterStorage.getPath(contents), contents);
- jobs.notify();
- }
-
- }
-
- @Override
- public void control(String control, String msg) {
- if (control == MemberSubscription.ERROR)
- Logger.error("Error in job subscription: "+msg);
- }
-
- /**
- * Removes job removal notification from the AgentProxy.
- */
- @Override
- public void remove(String id) {
- synchronized(jobs) {
- Logger.msg(7, "Deleting "+id);
- jobs.remove(id);
- }
- }
-
- public static UserCodeProcess getInstance() throws UnknownHostException {
- return new UserCodeProcess(InetAddress.getLocalHost().getHostName(), "uc");
- }
-
- static public void main(String[] args)
- {
- int status = 0;
-
- try
- {
- Gateway.init(readC2KArgs(args), false);
- UserCodeProcess proc = getInstance();
- new Thread(proc).start();
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- shutdown();
- }
- }));
- }
- catch( Exception ex )
- {
- Logger.error(ex);
-
- try
- {
- Gateway.close();
- }
- catch(Exception ex1)
- {
- Logger.error(ex1);
- }
- status = 1;
- System.exit(status);
- }
- }
-
- public String getDesc() {
- return("Usercode Process");
- }
-
- public static void shutdown() {
- active = false;
- }
-
-}