www.pudn.com > jnp-src.rar > GenericHandler.java
/*
* Java Network Programming, Second Edition
* Merlin Hughes, Michael Shoffner, Derek Hamner
* Manning Publications Company; ISBN 188477749X
*
* http://nitric.com/jnp/
*
* Copyright (c) 1997-1999 Merlin Hughes, Michael Shoffner, Derek Hamner;
* all rights reserved; see license.txt for details.
*/
import java.io.*;
import java.util.*;
public class GenericHandler extends Thread {
static private int handlerNumber;
static private synchronized int nextHandlerNum ()
{ return handlerNumber ++; }
protected Hashtable routes;
protected InputStream in;
protected OutputStream out;
public GenericHandler (Hashtable routes, InputStream in, OutputStream out) {
super ("GenericHandler-" + nextHandlerNum ());
this.routes = routes;
this.in = in;
this.out = out;
}
protected String name;
public void run () {
try {
DataInputStream dataIn = new DataInputStream (in);
name = dataIn.readUTF ();
accept (name);
} catch (IOException ex) {
ex.printStackTrace ();
} finally {
try {
out.close ();
} catch (IOException ex) {
ex.printStackTrace ();
}
}
}
protected MessageOutput myself;
protected void accept (String name) throws IOException {
Queue queue = new Queue ();
boolean registered = false;
synchronized (routes) {
if (!routes.containsKey (name)) {
registered = true;
routes.put (name, myself = new QueueOutputStream (queue));
}
}
try {
new DataOutputStream (out).writeBoolean (registered);
out.flush ();
if (registered) {
execute (queue);
}
} finally {
if (registered)
routes.remove (name);
}
}
protected void execute (Queue queue) throws IOException {
MessageInput queueIn = new QueueInputStream (queue);
MessageOutput messageOut = new MessageOutputStream (out);
GenericMessageCopier copier =
new GenericMessageCopier (this, queueIn, messageOut);
try {
copier.start ();
route ();
} finally {
copier.finish ();
}
}
protected void route () throws IOException {
MessageInputStream messageIn = new MessageInputStream (in);
RoutingInputStream routingIn = new RoutingInputStream (messageIn);
while (!Thread.interrupted ()) {
routingIn.receive ();
byte[] buffer = new byte[routingIn.available ()];
routingIn.readFully (buffer);
String[] targets = routingIn.getTargets ();
if (targets == null)
broadcast (buffer);
else
multicast (buffer, targets);
}
}
protected void broadcast (byte[] buffer) throws IOException {
Enumeration dsts = ((Hashtable) routes.clone ()).elements ();
while (dsts.hasMoreElements ()) {
MessageOutput messageOut = (MessageOutput) dsts.nextElement ();
send (buffer, messageOut);
}
}
protected void multicast (byte[] buffer, String[] targets) throws IOException {
for (int j = 0; j < targets.length; ++ j) {
MessageOutput messageOut = (MessageOutput) routes.get (targets[j]);
if (messageOut != null)
send (buffer, messageOut);
}
}
protected void send (byte[] buffer, MessageOutput messageOut)
throws IOException {
if (messageOut != myself) {
synchronized (messageOut) {
messageOut.write (buffer);
messageOut.send ();
}
}
}
}