www.pudn.com > jnp-src.rar > GenericHandler.java


/*
 * Java Network Programming, Second Edition
 * Merlin Hughes, Michael Shoffner, Derek Hamner
 * Manning Publications Company; ISBN 188477749X
 *
 * http://nitric.com/jnp/
 *
 * Copyright (c) 1997-1999 Merlin Hughes, Michael Shoffner, Derek Hamner;
 * all rights reserved; see license.txt for details.
 */

import java.io.*; 
import java.util.*; 
 
public class GenericHandler extends Thread { 
  static private int handlerNumber; 
  static private synchronized int nextHandlerNum () 
    { return handlerNumber ++; } 
 
  protected Hashtable routes; 
  protected InputStream in; 
  protected OutputStream out; 
 
  public GenericHandler (Hashtable routes, InputStream in, OutputStream out) { 
    super ("GenericHandler-" + nextHandlerNum ()); 
    this.routes = routes; 
    this.in = in; 
    this.out = out; 
  } 
 
  protected String name; 
   
  public void run () { 
    try { 
      DataInputStream dataIn = new DataInputStream (in); 
      name = dataIn.readUTF (); 
      accept (name); 
    } catch (IOException ex) { 
      ex.printStackTrace (); 
    } finally { 
      try { 
        out.close (); 
      } catch (IOException ex) { 
        ex.printStackTrace (); 
      } 
    } 
  } 
 
  protected MessageOutput myself; 
   
  protected void accept (String name) throws IOException { 
    Queue queue = new Queue (); 
    boolean registered = false; 
    synchronized (routes) { 
      if (!routes.containsKey (name)) { 
        registered = true; 
        routes.put (name, myself = new QueueOutputStream (queue)); 
      } 
    } 
    try { 
      new DataOutputStream (out).writeBoolean (registered); 
      out.flush (); 
      if (registered) { 
        execute (queue); 
      } 
    } finally { 
      if (registered) 
        routes.remove (name); 
    } 
  } 
 
  protected void execute (Queue queue) throws IOException { 
    MessageInput queueIn = new QueueInputStream (queue); 
    MessageOutput messageOut = new MessageOutputStream (out); 
    GenericMessageCopier copier = 
      new GenericMessageCopier (this, queueIn, messageOut); 
    try { 
      copier.start (); 
      route (); 
    } finally { 
      copier.finish (); 
    } 
  } 
 
  protected void route () throws IOException { 
    MessageInputStream messageIn = new MessageInputStream (in); 
    RoutingInputStream routingIn = new RoutingInputStream (messageIn); 
    while (!Thread.interrupted ()) { 
      routingIn.receive (); 
      byte[] buffer = new byte[routingIn.available ()]; 
      routingIn.readFully (buffer); 
      String[] targets = routingIn.getTargets (); 
      if (targets == null) 
        broadcast (buffer); 
      else 
        multicast (buffer, targets); 
    } 
  } 
 
  protected void broadcast (byte[] buffer) throws IOException { 
    Enumeration dsts = ((Hashtable) routes.clone ()).elements (); 
    while (dsts.hasMoreElements ()) { 
      MessageOutput messageOut = (MessageOutput) dsts.nextElement (); 
      send (buffer, messageOut); 
    } 
  } 
 
  protected void multicast (byte[] buffer, String[] targets) throws IOException { 
    for (int j = 0; j < targets.length; ++ j) { 
      MessageOutput messageOut = (MessageOutput) routes.get (targets[j]); 
      if (messageOut != null) 
        send (buffer, messageOut); 
    } 
  } 
 
  protected void send (byte[] buffer, MessageOutput messageOut) 
    throws IOException { 
    if (messageOut != myself) { 
      synchronized (messageOut) { 
        messageOut.write (buffer); 
        messageOut.send (); 
      } 
    } 
  } 
}