package SSF.OS.UDP.test; /** * udpStreamServer.java * */ import java.util.*; import com.renesys.raceway.SSF.*; import SSF.OS.*; import SSF.OS.UDP.*; import SSF.OS.Socket.*; import com.renesys.raceway.DML.*; import SSF.Net.*; import SSF.Net.Util.*; import SSF.Util.Random.*; import SSF.Util.Random.RandomStream; /** A simple UDP streaming server application provided as an example. * The server configures itself from the DML file that * specifies the server's wellKnownPort, the maximum datagram_size * ( = payload in bytes) and the send_interval (in seconds) between * sending consecutive chunks of virtual data to the client. *

The client-server protocol is absolutely minimal: the client * sends to server's well known address just one integer that specifies * the total amount of virtual data it requests. After receiving * a client's request, the server spawns a slave server that periodically * sends the datagrams of size datagram_size to the client until * the request is fulfilled. *

The default is no limit on the number of clients, unless the attribute * client_limit is set.

There * is no limit on the number of servers that can coexist in a single Host * as long as each udpServer has a distinct wellKnownPort number. *

If datagram_size is chosen to be larger than allowed by either the local * UDP protocol (udpinit.send_buffer_size), or by the remote UDP * receive buffer (udpinit.rcv_buffer_size), the datagrams will * be dropped - with some diagnostic output if the debug options * are set to true.

The datagram_size and send_interval should be chosen * so that the resulting data bandwidth does not exceed the bitrate * specified on the server's Host link interface, otherwise the interface * will queue the outgoing datagrams and may drop them if configured * with small enough buffer. */ public class udpStreamServer extends ProtocolSession { /********************* Attribute Variables *********************/ boolean prints=false; /** port number used by this server */ int wellKnownPort; /** default maximum server datagram size in bytes */ int datagram_size = 1000; /** time between sending datagrams (in ssf ticks) */ long send_interval = 0; /** size of data request object from matching client */ int request_size; /** "listening" socket of this server */ udpSocket udpsocket; /** max number of clients of this server - default is unlimited */ int client_limit = Integer.MAX_VALUE; /** current number of contemporaneous clients of this server */ int clientNumber = 0; /** Host entity where this pseudo-protocol is installed */ Host localHost; /** Host's IP address */ int localIP; /** Host's NHI address */ String localNHI; /** Host's Socket ProtocolSession */ socketMaster socketms; /** default is don't print verbose debugging information */ boolean debug = false; /** default is show summary session information */ boolean show_report = false; long seed; /*************** The slave server inner class ***************/ class slaveServer extends ProtocolSession { udpStreamServer owner; int nbytesRemaining; int nbytesRequest; int dataSize; int numDatagrams = 0; int numFailures = 0; udpSocket sd; int srcIP; int srcPort; int destIP; int destPort; public slaveServer(int docsize, int local_ip, int local_port, int remote_ip, int remote_port) { owner = udpStreamServer.this; nbytesRemaining = docsize; nbytesRequest = docsize; srcIP = local_ip; srcPort = local_port; destIP = remote_ip; destPort = remote_port; dataSize = owner.datagram_size; //System.out.println("creating slaveServer with destPort="+destPort); } void start() { try { sd = (udpSocket)owner.socketms.socket(this, "udp"); sd.bind(srcIP, srcPort); sd.connect(destIP, destPort, null); // creates a UDP session } catch (ProtocolException e) { System.err.println(e); } senddata(); } void senddata() { if(owner.send_interval > 0){ { (new SSF.OS.Timer(owner.inGraph(), SSF.Net.Net.seconds(owner.send_interval)){ public void callback(){ if (nbytesRemaining > 0){ if((nbytesRemaining - dataSize) < 0) dataSize = nbytesRemaining; sd.write(dataSize, new Continuation(){ public void success() { //nbytesRemaining -= dataSize; if (debug) { if (prints) localInfo(" wrote " + dataSize + " bytes OK"); ++numDatagrams; } } public void failure(int errno) { if (debug) { ++numDatagrams; ++numFailures; if(errno == 1) localInfo(" write failed, exceeds max_datagram_size"); if(errno == 2) localInfo(" write failed, dropped by IP or NIC"); } } }); set(owner.send_interval); } else try { sd.close(null); --owner.clientNumber; if(debug) sessionInfo("sent " + nbytesRequest + "B" + " #pkts " + numDatagrams + " #failed " + numFailures); cancel(); } catch (ProtocolException e) { System.out.println(e); } } }).set(owner.send_interval); } } else { while(nbytesRemaining > 0){ if((nbytesRemaining - dataSize) < 0) dataSize = nbytesRemaining; sd.write(dataSize, new Continuation(){ public void success() { //nbytesRemaining -= dataSize; if (debug ) { if (prints) localInfo(" wrote " + dataSize + " bytes OK"); ++numDatagrams; } } public void failure(int errno) { if (debug) { ++numDatagrams; ++numFailures; if(errno == 1) localInfo(" write failed, exceeds max_datagram_size"); if(errno == 2) localInfo(" write failed, dropped by IP or NIC"); } } }); } try { sd.close(null); --owner.clientNumber; if(debug) sessionInfo("sent " + nbytesRequest + "B" + " #pkts" + numDatagrams + " #failed " + numFailures); } catch (ProtocolException e) { System.out.println(e); } } } /** preamble to slave server diagnostics */ void localInfo(String str){ System.out.println(owner.inGraph().now()/(double)SSF.Net.Net.seconds(1.0) + " udpServer " +localNHI/* IP_s.IPtoString(srcIP)*/ + ":" + srcPort + " " + str); } /** preamble to end2end UDP session diagnostics */ void sessionInfo(String str){ System.out.println(owner.inGraph().now()/(double)SSF.Net.Net.seconds(1.0) + " udpServer " + localNHI/*IP_s.IPtoString(srcIP)*/ + ":" + srcPort + " clnt " + IP_s.IPtoString(destIP) + ":" + destPort + " " + str); } public boolean push(ProtocolMessage message, ProtocolSession fromSession) throws ProtocolException { return false; } } // end of slaveServer /************************* Constructors ************************/ public udpStreamServer() {} /************************ Class Methods ***********************/ /** Server configuration. Supported DML attributes: *

   *  ProtocolSession [ name server use SSF.OS.UDP.test.udpStreamServer
   *    port          %I      # server's well known port number
   *    client_limit  %I      # max number of simultaneous clients,
   *                          # if omitted, no limit
   *
   *    request_size  %I      # client request nominal datagram size (bytes, int)
   *    datagram_size %I      # max server datagram payload size (virtual bytes, int)
   *    send_interval %F      # time interval between consecutive server datagrams, sec
   *                          # if omitted, datagrams are sent back to back
   *
   *    show_report   %S      # print client-server session summary report, true/false
   *    debug         %S      # print verbose client/server diagnostics, true/false
   *  ]
   * 
*/ public void config(Configuration cfg) throws configException { super.config(cfg); String str; if((str = (String)cfg.findSingle("port")) != null) { wellKnownPort = (new Integer(str)).intValue(); if (wellKnownPort > 10000) serverInfo("Warning: udpServer port must be < 10,000."); } if((str = (String)cfg.findSingle("request_size")) != null) request_size = (new Integer(str)).intValue(); if((str = (String)cfg.findSingle("datagram_size")) != null) datagram_size = (new Integer(str)).intValue(); if((str = (String)cfg.findSingle("client_limit")) != null) client_limit = (new Integer(str)).intValue(); str = (String)cfg.findSingle("debug"); if((str != null) && (str.compareTo("true") == 0)) debug = true; str = (String)cfg.findSingle("show_report"); if((str != null) && (str.compareTo("false") == 0)) show_report = false; if((str = (String)cfg.findSingle("send_interval")) != null) { //double interval = (new Double(str)).doubleValue(); send_interval = SSF.Net.Net.seconds((new Double(str)).doubleValue()); } //send interval seed String seedStr=null; seedStr=(String)cfg.findSingle("seed"); if (seedStr!=null) { seed=((long)(new Double(seedStr)).doubleValue()); // System.out.println("seed from file is ="+seed); } } Object[] obj; public void init(){ localHost = (SSF.Net.Host)inGraph(); localNHI = localHost.nhi; try { socketms = (socketMaster)localHost.SessionForName("socket"); udpsocket = (udpSocket)socketms.socket(this, "udp"); localIP = ((NIC)((IP)localHost.SessionForName("ip")) .INTERFACES.elementAt(0)).ipAddr; }catch (ProtocolException e) { System.err.println("udpServer: " + e); } udpsocket.bind(-1, wellKnownPort); udpsocket.listen(client_limit); // creates a UDP session obj = new Object[1]; serv(); } public void serv() { if (prints) System.out.println("Server is serving .."); udpsocket.read(obj, request_size, new Continuation(){ public void success() { if((obj == null) || (obj[0] == null)) { serverInfo("received unknown request"); } else { ++clientNumber; if(clientNumber > client_limit) { if(debug) serverInfo("max # clients exceeded, refused"); --clientNumber; } else { int doc_size = ((Integer)obj[0]).intValue(); if(debug) serverInfo(" request " + doc_size + "B"); // a slave server reuses the master server's port number, // and uses the local IP, remote port and remote IP that were // contained in the service request UDP header slaveServer srv = new slaveServer(doc_size, udpsocket.last_local_ip, wellKnownPort, udpsocket.last_remote_ip, udpsocket.last_remote_port ); srv.start(); } } serv(); } public void failure(int errno) { if(debug) serverInfo("failure reading clnt request"); serv(); } }); // end udpsocket.read } /** preamble to server diagnostics */ void serverInfo(String str){ System.out.println(localHost.now()/(double)SSF.Net.Net.seconds(1.0) + " udpServer " + IP_s.IPtoString(localIP) + ":" + udpsocket.local_port + " " + str); } public boolean push(ProtocolMessage message, ProtocolSession fromSession) throws ProtocolException { return false; } }