package com.c2kernel.entity.proxy; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.InterruptedIOException; import java.io.PrintWriter; import java.net.Socket; import java.net.SocketException; import java.util.ArrayList; import java.util.Iterator; import com.c2kernel.common.InvalidDataException; import com.c2kernel.utils.Logger; import com.c2kernel.utils.server.SocketHandler; /************************************************************************** * * $Revision: 1.18 $ * $Date: 2005/05/10 11:40:09 $ * * Copyright (C) 2003 CERN - European Organization for Nuclear Research * All rights reserved. **************************************************************************/ public class ProxyClientConnection implements SocketHandler { Socket clientSocket = null; static int clientId = -1; int thisClientId; ArrayList sysKeys; PrintWriter response; BufferedReader request; boolean closing = false; public ProxyClientConnection() { super(); thisClientId = ++clientId; EntityProxyManager.registerProxyClient(this); Logger.msg(1, "Proxy Client Connection Handler "+thisClientId+" ready."); } public String getName() { return "Proxy Client Connection"; } public boolean isBusy() { return clientSocket != null; } public synchronized void setSocket(Socket newSocket) { try { Logger.msg(1, "Proxy Client Connection "+thisClientId+" connect from "+newSocket.getInetAddress()+":"+newSocket.getPort()); newSocket.setSoTimeout(500); clientSocket = newSocket; response = new PrintWriter(clientSocket.getOutputStream(), true); sysKeys = new ArrayList(); } catch (SocketException ex) { Logger.msg("Could not set socket timeout:"); Logger.error(ex); closeSocket(); } catch (IOException ex) { Logger.msg("Could not setup output stream:"); Logger.error(ex); closeSocket(); } } /** * Main loop. Reads proxy commands from the client and acts on them. */ public void run() { Thread.currentThread().setName("Proxy Client Connection: "+clientSocket.getInetAddress()); Logger.msg(7, "ProxyClientConnection "+thisClientId+" - Setting up proxy client connection with "+clientSocket.getInetAddress()); try { request = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); String input = null; ProxyMessage thisMessage; while (clientSocket != null) { try { input = request.readLine(); Logger.msg(9, "ProxyClientConnection "+thisClientId+" - received "+input); thisMessage = new ProxyMessage(input); processMessage(thisMessage); } catch (InterruptedIOException ex) { //timeout } catch (InvalidDataException ex) { // invalid proxy message Logger.error("ProxyClientConnection "+thisClientId+" - Invalid proxy message: "+input); } } } catch (IOException ex) { if (!closing) Logger.error("ProxyClientConnection "+thisClientId+" - Error reading from socket."); } closeSocket(); Logger.msg(1, "ProxyClientConnection "+thisClientId+" closed."); } private void processMessage(ProxyMessage message) throws InvalidDataException { // proxy disconnection if (message.getPath().equals(ProxyMessage.BYEPATH)) { Logger.msg(7, "ProxyClientConnection "+thisClientId+" disconnecting"); closeSocket(); } // proxy checking connection else if (message.getPath().equals(ProxyMessage.PINGPATH)) response.println(ProxyMessage.pingMessage); // new subscription to entity changes else if (message.getPath().equals(ProxyMessage.ADDPATH)) { Logger.msg(7, "ProxyClientConnection "+thisClientId+" subscribed to "+message.getSysKey()); synchronized (sysKeys) { sysKeys.add(new Integer(message.getSysKey())); } } // remove of subscription to entity changes else if (message.getPath().equals(ProxyMessage.DELPATH)) { synchronized (sysKeys) { sysKeys.remove(new Integer(message.getSysKey())); } Logger.msg(7, "ProxyClientConnection "+thisClientId+" unsubscribed from "+message.getSysKey()); } else // unknown message Logger.error("ProxyClientConnection "+thisClientId+" - Unknown message type: "+message); } public synchronized void sendMessage(ProxyMessage message) { if (clientSocket==null) return; // idle boolean relevant = message.getSysKey() == ProxyMessage.NA; synchronized (sysKeys) { for (Iterator iter = sysKeys.iterator(); iter.hasNext() && !relevant;) { Integer thisKey = (Integer)iter.next(); if (thisKey.intValue() == message.getSysKey()) relevant = true; } } if (!relevant) return; // not for our client response.println(message); } public void shutdown() { if (isBusy()) { closing = true; Logger.msg("ProxyClientConnection "+thisClientId+" closing."); closeSocket(); } } public String toString() { if (clientSocket == null) return thisClientId+": idle"; else return thisClientId+": "+clientSocket.getInetAddress(); } private synchronized void closeSocket() { if (clientSocket==null) return; try { request.close(); response.close(); clientSocket.close(); } catch (IOException e) { Logger.error("ProxyClientConnection "+thisClientId+" - Could not close socket."); Logger.error(e); } synchronized (sysKeys) { sysKeys = null; } clientSocket = null; } }