/* * $Author: rahul_kumar $ * $Id: NioSocket.java,v 1.22 2003/10/15 17:55:42 rahul_kumar Exp $ * * Copyright (c) Rahul Kumar, 2003. All rights reserved. * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt * The latest copy of this software may be found on http://jniosocket.sourceforge.net/ * */ package raining.core; import java.util.*; import java.io.*; import java.nio.*; import java.net.*; import java.nio.channels.*; import java.nio.charset.*; /** * A Java 1.4.2 NIO-based non-blocking I/O framework, with buffered * input and output. * This abstracts the NIO, so that a user may extend this class and only * override some events. It also tries to be as functional as possible * so that a user can get away with as little overriding as * possible.
* Many fields are public - deliberately so that applications arent * slowed down using a get/set method. Anyone doing NIO work would be * sensible enough to use these values sensibly :)
* Some event handles could have passed data to user, but due to tricky * buffer handling, i didnt do that.
* * I may extend this class, and move some code down to extended classes, * as there can be several usages of an NIO socket. * * DONE: allow gathering writes (setSendData to take ByteBuffer array. * I then need to change send() and check for the highest * array index having some data in it. * output buffering not done - DONE * I shd move complete handles to an extender. * also handle terminate * * This class tries to give behavioir for both a server and a client. * Maybe we should remove the server assumptions and put them into the * Server class. * * Note: pls do NOT compile with versions less than 1.4.2. * -------------------------------------------------------------------- * Date Version Change * (over which change was made) * 2003-10-12 1.1.9: i have removed calls to handle_terminator, this can * be handled by a server, or even our generic server. * Added is_read_complete * 2003-12-05 1.1.9: Added setSendData (ByteBuffer) for those who have * read data in as a BB. * 2003-12-06 1.1.10 subsctituting single BB with BB array for gathering * writes * -------------------------------------------------------------------- */ public class NioSocket { static String RCS_ID = "$Id: NioSocket.java,v 1.22 2003/10/15 17:55:42 rahul_kumar Exp $"; /** print debug statements or not. Since NIO can be tricky, we need * debug statements for when things suddenly stop working. However, * for a tested application, you may want to change this to final, * and recompile. */ public static boolean debug = false; public static void main (String args[]){ try { NioSocket nios = new NioSocket (); nios.create_server_socket(8080); NioSocket nio = new NioSocket (); //nio.create_client_socket("www.google.com",80,"GET / HTTP/1.0\r\n\r\n"); nio.create_client_socket("127.0.0.1",80,"GET /examples/jsp/index.html HTTP/1.0\r\n\r\n"); NioSocket.start(); } catch (Exception exc) { System.err.println( "32 EXC:"+ exc.toString()); exc.printStackTrace(); } } // TODO: // The static stuff needs to be moved to a class. // The NioSockets would be added to the class // public static Selector selector = null; /** used for identifier of each socket */ static long counter = 0; /** how many active sockets in system */ public static int activeSocketCount = 0; /** maximum number of active sockets during program lifetime */ static int peakactive = 0; /** when did the system start. */ static long startmillis = 0; static java.util.Date startdate = null; static { System.out.println( " " ); System.out.println( " +--------------------------------------+"); System.out.println( " | RAINING SOCKETS V 1.2pre3c |"); System.out.println( " | Java 1.4 Non Blocking IO Framework |"); System.out.println( " |......................................|"); System.out.println( " | (c) Rahul Kumar, Sep 2003. |"); System.out.println( " |......................................|"); System.out.println( " | Status: Development-beta. |"); System.out.println( " | Functional, tested to 10K,15K conns|"); System.out.println( " +--------------------------------------+"); System.out.println( " " ); System.out.println( new java.util.Date() ); try { selector = Selector.open(); } catch (Exception exc) { System.err.println( "Opening a selector L46 EXC:"+ exc.toString()); exc.printStackTrace(); } } /** this contains a mapping of channel and NioSocket object * */ static Map socketMap = new HashMap(); static void s_read (NioSocket ns) // throws java.io.IOException { ns.handle_read_event(); } static void s_write (NioSocket ns) //throws java.io.IOException { ns.handle_write_event(); } static void s_accept (NioSocket ns) throws java.io.IOException { ns.handle_accept(); } static void s_connect (NioSocket ns) throws java.io.IOException { ns.handle_connect(); } // readwrite /** method that polls the selector/ descriptor set. * acco to unix man page on select, we should not * use a timeout on a select operation since that wd block. * TODO: throwing IO exceptions up means that if one exception * comes, then we come right outa the loop. BAD */ static void poll (long timeout) throws IOException{ //if (timeout < 0) timeout = 0; // remove this !!! if (socketMap.isEmpty()) return; int n = 0; try { n = selector.select(timeout); } catch (Exception exc) { //System.err.println( "Select error 131 EXC:"+ exc.toString()); //exc.printStackTrace(); // control C on_exit_handler(); stop_now(); System.exit(0); } if (n == 0){ return; } Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()){ SelectionKey key = (SelectionKey) it.next(); // RK modified on 20031006 13:17:06 checks added since i am // getting a null on line 226 in connect. // if the next line DOESNT COME, you may remove the isValid // check to speed up. if (key == null || (!key.isValid()) ){ System.err.println( "Nio: loop: 198. got a null key."); continue; } NioSocket ns = (NioSocket) socketMap.get(key.channel()); // RK modified on 20031006 13:17:06 checks added since i am // getting a null on line 226 in connect. if (ns == null){ //System.err.println( "Nio: loop: 209. got a null NioSocket "+key.readyOps()); //System.err.println( "+++: attach:"+ key.attachment()); ns = (NioSocket)key.attachment(); if (ns != null) { //System.err.println( "-id: "+ ns.id); socketMap.put(key.channel(), ns); //ns = ss; } else continue; } // ns null if (debug) System.out.println( "Found a channel, key:"+ns.id); if (key.isAcceptable()) { if (debug) System.out.println( " -- acceptable "); s_accept (ns); } if (key.isConnectable()) { SocketChannel sock = (SocketChannel)ns.channel(); if (debug){ System.out.println( " -- connectable "); } //System.out.println( " -- connect pending: "+sock.isConnectionPending()); // RK modified on 20031006 13:45:46 // added a try here. hope it doesnt slow down the app // this is where CONNECTION TIME OUTS are happening try { sock.finishConnect(); } catch (Exception exc) { //System.err.println( " Nio finish connect 249 EXC:"+ exc.toString()); //System.err.println( " Lost Id: "+ns.id); conn_time_out_handler(ns, key); continue; //exc.printStackTrace(); } if (sock.isConnected()){ // write after you are connected // RK modified on 20031008 23:14:43 on 1.1.8 // NOT UNTILL YOU HAVE SOMETHIGN TO WRITE //Object o = key.attachment(); //SelectionKey sk = registerChannel(selector, ns.channel(), SelectionKey.OP_WRITE); //sk.attach(o); //THE ABOVE MAY BE NEEDED BACK - HOWEVER IT MAKES //PROXY GO INTO A WRITABLE LOOP IF NOTHIGN TO WRITE s_connect (ns); } } if (key.isReadable()){ if (debug) System.out.println( " -- readable "+ns.id); s_read (ns); } else if (key.isWritable()){ if (debug) System.out.println( " -- writable "+ns.id); s_write (ns); } it.remove(); } // while } // poll /** should the select() loop stop after doing given job. * Default is false, since we will keep sending requests. */ public static boolean stopWhenIdle = false; /** stop the server after pending jobs are complete. */ public static void stopWhenIdle (boolean flag){ stopWhenIdle = flag; } public static boolean stopWhenIdle (){ return stopWhenIdle; } /** start the select() loop, with no timeout */ public static void start() throws IOException{ start(0); } /** start the select() loop, with given timeout * */ public static void start(long timeout) throws IOException{ System.err.println( "-- Started --"); startmillis = System.currentTimeMillis(); startdate = new java.util.Date(); System.err.println( "Millis:" + startmillis ); System.err.println( startdate ); while (true){ if (stopWhenIdle){ if (socketMap.isEmpty()){ on_exit_handler(); break; } } // RK modified on 20031006 13:21:17 added try here so we // dont break out if theres an error!! try { poll(timeout); } catch (Exception exc) { //System.err.println( " Nio: start: 321 EXC:"+ exc.toString()); //exc.printStackTrace(); System.err.print('?'); } } } // start /** register the channel with the given operation and return the * selection key, so that the user may attach something to the key. * Note: is the wakeup required at all. * Isnt it wasteful to config the channel to be non-blocking again * and again. */ static SelectionKey registerChannel (Selector selector, SelectableChannel channel, int ops) //throws IOException { if (channel == null){ return null; } SelectionKey sk = null; try { channel.configureBlocking(false); sk = channel.register( selector, ops); // XXX selector.wakeup(); } catch (Exception e) {} return sk; } /** Current count of active sockets. Usually you would just access * activeSocketCount. */ public static int socketCount(){ // could return activeSocketCount; return socketMap.size(); } /** Reset the counter used for sockets - this is the id field, used * mainly in debug statements only. * */ public static void resetSocketCounter( long resetvalue){ counter = resetvalue; } /** May be overriden to handle events related to changing counts. * Naah ! i need to pass an event handler. */ public static void socketCountChangeEvent( int count, int event, long id){ // may be overriden to maintain count related things } // temporarily for sake of blaster. i am running outa memory /** size of read buffer for each NIO socket. * Preferable to keep it * really low like 128 if doing load testing, as in socketblaster. * Increase it when writing a pipelined application, if you expect * many requests coming together. */ public static int READ_BUFFER_SIZE = 256; /** this is in disuse since applications are sending in bytebuffers. */ public static int WRITE_BUFFER_SIZE = 4096; /** default number of indices in the write bb. * Set to 2 for a non-pipelined socket. * Set higher for pipelined, depending on how many embedded parts * a document has. * RK added on 20040219 18:03:37 * I changed from 2 to 1. SocketBlaster was NPE'ing, even though i * was setting there. perhaps i should nt set at all. */ public static int DEFAULT_BB_SIZE = 1; /** Set the read buffer size - high for a client, low for a server. * not needed, but added just so that people know that they can set * this to a sensible value, based on application needs. */ public static void setReadBufferSize(int rbs){ READ_BUFFER_SIZE = rbs; } /** set the write buffer size - keep high for a server, low for a client. */ public static void setWriteBufferSize(int wbs){ WRITE_BUFFER_SIZE = wbs; } /** set the number of indices to allocate for writing. * Set high for MT apps sending data from concurrent processes (e.g. * a document with many images, where the client has sent many * file requests at the same time. * Note that the BB would grow * itself if required by doubling, so an average size can be kept. * Based on the kind of content, you would decide how many GET * requests you can get in one client request. */ public static void setByteBufferArrayLength(int size){ DEFAULT_BB_SIZE = size; } // instance variables INST VAR ByteBuffer rBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); // is there any point in preallocating since i am assigning from a // cache // RK added on 20031206 23:04:53 subsctituting single with array //ByteBuffer wBuffer = ByteBuffer.allocate(WRITE_BUFFER_SIZE); //ByteBuffer[] wBuffers = new ByteBuffer[1]; // i am allowing client to allocate so they can decide its size. // RK added on 20031214 13:57:3 so that pipelining sockets can set // their size. public ByteBuffer[] wBuffers = null; protected int buffer_offset = 0; /** use this to append content of rBuffer - reduce for servers. */ public StringBuffer rsb = new StringBuffer(256); /** the channel itself */ SelectableChannel sc = null; Socket socket = null; boolean connected = false; boolean accepting = false; boolean read_pending = true; // this is a hack till i find out more boolean write_pending = false; // another hack // a channel created due to an accept or due to a connect. // if accepting, then we dont close upon read zero // if connecting then we do. boolean accepting_channel = false; boolean end_of_read = false; boolean serverflag = false; /** Identifier of socket, starting with zero */ public long id = 0; /** accumulated number of bytes read across reads */ public int totalBytesRead = 0; /** Empty constructor to create a non-blocking socketchannel. Use to * create a server or client socketchannel. */ public NioSocket () throws IOException{ this(null); } /** constructor to create a non-blocking socket taking an existing * socket. Usually called by a server socket to register its * incoming sockets. */ public NioSocket (SelectableChannel sockch) throws IOException{ if (sockch != null){ this.sc = sockch; add_to_map(); this.sc.configureBlocking(false); this.connected = true; this.accepting_channel = true; // this has not effect //((SocketChannel)sockch).socket().setKeepAlive(false); } else this.sc = null; this.id = NioSocket.counter++; if (debug) System.out.println( "Created id: "+ this.id); } /** Returns whether this is a server socket. */ public boolean serverflag () { return serverflag; } public static final int DEL_EVENT = -1; public static final int ADD_EVENT = 1; protected void add_to_map () { synchronized (socketMap) { socketMap.put(this.sc, this); } activeSocketCount++; if (activeSocketCount > peakactive) peakactive = activeSocketCount; // deleted this since this could slow things down //socketCountChangeEvent(activeSocketCount, ADD_EVENT, this.id); } /** delete socket from our map. * How about removing from selector keys?? */ protected void del_from_map () { synchronized (socketMap) { Object o = socketMap.remove(this.sc); if (o!=null) activeSocketCount--; else{ return; } } // synch //activeSocketCount--; //socketCountChangeEvent(activeSocketCount, DEL_EVENT, this.id); } /** create a server socket given a port. All server apps will call * this at startup. */ public void create_server_socket(int port) throws IOException, SocketException { this.sc = ServerSocketChannel.open(); ServerSocket serverSocket = ((ServerSocketChannel)sc).socket(); this.sc.configureBlocking(false); serverSocket.setReuseAddress( true ); serverSocket.bind (new InetSocketAddress( port )); this.accepting = true; this.read_pending = false; this.serverflag = true; SelectionKey sk = this.sc.register(selector, SelectionKey.OP_ACCEPT); sk.attach(this); // NEW RK this.add_to_map(); //if (debug) System.out.println( "Created Server Socket on:"+ port); } /** create a client socket on the given host and port, and push the * given data to it immediately. All client programs will call this, * usually many times. */ public void create_client_socket(String host, int port, String path) throws IOException { create_client_socket(host, port); if (path != null) setSendData(path); } /** create a client socket on the given host and port. In this case * the developer would send data in the handle_connect event using * setSendData. */ public void create_client_socket(String host, int port) throws IOException { SocketChannel schannel = SocketChannel.open(); schannel.configureBlocking(false); schannel.connect (new java.net.InetSocketAddress(host, port)); this.sc = schannel; this.connected = false; this.serverflag = false; SelectionKey sk = schannel.register(selector, SelectionKey.OP_CONNECT); //sk.attach(new Long(counter)); sk.attach( this ); if (debug) System.out.println( "created client socket "+this.id+" host:"+host ); this.add_to_map(); wBuffers = new ByteBuffer[DEFAULT_BB_SIZE]; } /** close the socket channel object. */ public void nio_close () { try { this.sc.close(); } catch (Exception exc) { System.err.println( "NIO close() 430EXC:"+ exc.toString()); exc.printStackTrace(); } del_from_map(); } /** determine if this socket is readable. Was earlier used in a loop * that changed interest sets, but now seems to be out of fashion. * May be extended.*/ public boolean readable (){ //return wBuffer.hasRemaining(); // even this should work if (!((SocketChannel)this.sc).isConnected()) return false; return this.read_pending; } /** determine if this socket is writable. Now unused like earlier * method. May be extended. */ public boolean writeable (){ // System.out.println( " remaining:"+ wBuffer.remaining()); //return ( (wBuffer.remaining() < 4096)? true: false); if (!((SocketChannel)this.sc).isConnected()) return false; if (!write_pending) return false; // TODO not sure about the next line at all return ( (wBuffers[buffer_offset].remaining() > 0)? true: false); } protected void handle_read_event(){ handle_read(); } protected void handle_write_event(){ handle_write(); } /** reads data from the socketchannel * returns the number of bytes read. * * if an exception is thrown, i should still call the complete * method. also we should close the socket on -1, or does that only * mean that readig is over, and we can start writing ? * Should this be final ?? */ public int recv () throws IOException { // XXX Shd this be in a loop, just in case more data came // through, during this read. int count = 0, cnt=0; while ((cnt = ((SocketChannel)this.sc).read( rBuffer )) >0){ count += cnt; if (cnt == READ_BUFFER_SIZE){ rsb.append( getReadDataAsString()); } } if (rBuffer.hasRemaining()){ rsb.append( getReadDataAsString()); } //cnt = ((SocketChannel)this.sc).read( rBuffer ); //count = cnt; // XXX should i keep moving to a StringBuffer // i dont know when the overriding method will flip it totalBytesRead += count; if (debug) System.out.println( id+")=>>>>> RECV: got total:"+totalBytesRead+" this total:" +count+" cnt:"+cnt); if ( cnt < 0 ){ // the next line can be dangerous since the protocol // analysis being done in HttpClient will be ignored. // DID i mean a close is dangerous or a // handle_read_complete? //handle_read_complete(getReadData()); rBuffer.clear(); this.read_pending = false; this.end_of_read = true; // say that -1 reached on read handle_close(); // i hope this doesn't make life miserable for user return (count); } else { this.read_pending = true; registerChannel(selector, channel(), SelectionKey.OP_READ); } return count; } /** read handler. * may be overriden. * i have changed void to int, so this one can be invoked. */ public int handle_read(){ int count = -2; // if recv throws, then -2 can be used. try { count = recv(); if (count>0) readcount++; } catch (IOException exc) { System.err.println( " Nio h_r 610 EXC:"+ exc.toString()); System.err.println(" rsb:"+rsb); // usually a connection reset by peer comes here, and the // data is blank. conn_reset_handler(); return count; // added RK modified on 20031011 15:54:09 } catch (Exception exc) { // added RK modified on 20031012 System.err.println( " Nio h_r 611 EXC:"+ exc.toString()); exc.printStackTrace(); // HEY SHOULDnt i close here!! XXX return count; } if (count >0) { if (debug) System.out.println( " ==>>>-- NIO h_r --:"+count+" id:" + this.id); // RK modified on 20031012 13:55:31: added is_read_complete // RK added on 20031214 14:47:43 moved out so we can // override easily. handle_input(); } else if (count == 0) { rBuffer.clear(); // this shd be in recv } return count; } /** called when the read completes (i.e.we get a -1 or disconnect * on the socket. * this can take a long time after data has arrived. * to be investigated. * You may call this from handle_terminator when you implement a * protocol. */ public void handle_read_complete(String mdata){ System.out.println( " ==>>> NIO hrc got ("+totalBytesRead+"):"+ mdata ); } /** convenience method for an extender wanting to trigger the * complete data from handle_read, but not knowing where the data * parameter comes from. */ public void handle_read_complete(){ handle_read_complete(getReadData()); } /** Send the data to the channel, returning the number of bytes * written. * This does the actual write, and * usually you would not override this. */ public int send () throws IOException { int count = 0, cnt = 0; if (!write_pending) { return 0; } while (wBuffers[buffer_offset].hasRemaining()) { cnt = (int) ((SocketChannel)this.sc).write(wBuffers); count += cnt; } if (wBuffers[buffer_offset].remaining()==0) { read_pending = true; registerChannel(selector, channel(), SelectionKey.OP_READ); write_pending = false; // what if there's still more in buffer } if (debug) System.out.println( id+" Written total:"+count+ " cnt="+cnt); return count; } /** handler that handles write. This calls send(). * if nothing more to write on an accepting channel then close * (by calling write_complete). * User would override this if he wants to keep a conversation. */ public void handle_write(){ try { int count = send(); if (count>0) writecount++; // can happen many times on one channel } catch (Exception exc) { System.err.println( "590 EXC:"+ exc.toString()); exc.printStackTrace(); } // if nothing more to write on an accpetig channel then close // user would override this is he wants to keep a conversation. if (accepting_channel){ if (!writeable()){ handle_write_complete(); } } } /** Accepts a connection from a server socket. Usually called from * handle_accept. Returns the SocketChannel recved by server socket. * This registers the new channel for a read. * RK modified on 20031012 13:31:08 * I would prefer to read off whats coming in straight off since its * likely to finish in one operation, than * register for a read. But pushing the register into the handle * makes it difficult for the user. */ public SocketChannel accept (){ ServerSocketChannel server = (ServerSocketChannel)this.sc; SocketChannel schannel = null; try { schannel = server.accept(); //schannel = server.accept().getChannel(); // changed after compilation with // 1.4.0. this never gave an error with 1.4.2 SelectionKey sk = registerChannel(selector, schannel, SelectionKey.OP_READ); // RK remarked on 20031012 11:35:56: attach this or the new // object created in handle ??? XXX sk.attach(this); } catch (Exception exc) { System.err.println( "Nio accept 661 EXC:"+ exc.toString()); exc.printStackTrace(); } this.read_pending = true; return schannel; } /** handle for when a serve gets a connection. * Earlier this was handled up above in the select loop, which makes * Servers simpler. However, a user lost flexibility, thus this has * been moved down * to the handle. This makes it tougher for a user, but he can reject a * connection, or process it at will. * Usually a user, will do an accept here, and recieve the incoming * socket. He will register it for reading: * Note that you can replace line 2 with: * 2. MyNioSocket servsock = new MyNioSocket (schannel, null); * in case you have extended NioSoket. This is why this code is * now here. * * 1. SocketChannel schannel = this.accept(); * 2. NioSocket servsock = new NioSocket (schannel); * OR MyNioSocker servsock = new MyNioSocket (schannel); * */ public void handle_accept(){ SocketChannel schannel = null; NioSocket servsock = null; try { schannel = this.accept(); servsock = new NioSocket (schannel); // RK modified on 20031012 13:30:04: added since a // connecting channel can be read from immediately. Usually // there will be a tiny request that can be handled in one // shot. servsock.handle_read(); } catch (Exception exc) { System.err.println( "Nio handle accept 675 EXC:"+ exc.toString()); exc.printStackTrace(); } accept_accounting(); } /** sum up the number of sockets that have been accepted. * Returns the current count. * RK modified on 20031011 15:21:06: moved out of handle to a * separate method. */ public int accept_accounting(){ acceptcount++; if (++acceptctr==REPORT_AFTER){ acceptctr=0; System.err.println( "A:"+acceptcount+":"+NioSocket.counter+":"+activeSocketCount); } return acceptcount; } /** Connect handler, called when a client socket has connected. You * may want to send a string after connecting, if you created the * socket without the data parameter. Currently just contains a call * to send(). */ public void handle_connect(){ connect_accounting(); try { send(); } catch (Exception exc) { System.err.println( "EXC:"+ exc.toString()); exc.printStackTrace(); } } /** keeps totals of how many connects happened. * Returns the current count. * RK modified on 20031011 15:21:06: moved out of handle to a * separate method. */ public int connect_accounting(){ conncount++; if (++connctr==REPORT_AFTER){ connctr=0; System.err.println("+++ Conn:"+conncount); } return conncount; } /** called when a socket object is to be closed. Currently calls the * close of this object. */ public void handle_close(){ this.nio_close(); } /** returns the selectable channel of this object. */ public SelectableChannel channel(){ return sc; } public static void setDebug (boolean flag){ debug = flag; } /** Set the given data to be sent later. * This should only be used if the caller received the data as a String. * If you received the data as a byte[], use the relevant method. * We cant put a write * operation since it will negate the read that may be happening. We * can *add* a write interest. Otherwise i see nowhere else from where * a WRITE interest is being added. Conversational programs can have * a problem writing. */ public void setSendData(String data) { if (debug) System.out.println( "Nio 837 setting:" + data); setSendData(data.getBytes()); } /** Set the given data to be sent later. * This method was added on * 2003-10-29 since we would usually be reading from a file or a * source in bytes. */ public void setSendData(byte[] data) { setSendData(ByteBuffer.wrap(data)); } /** Set the given bytebuffer to be sent later. * This would be used for sending just the header in response to a * HEAD request, and to send off error responses where there is no * content. * */ public void setSendData(ByteBuffer data) { if (debug) System.out.println( "iNSIDE setSendData BB:"+ id); this.write_pending = true; wBuffers[0] = data; buffer_offset = 0; //wBuffer = data; setWriteInterest(); //RK added on 20031214 13:47:25 if (debug) System.out.println( "EXITING setSendData BB"+ id); } /** Set the given bytebuffer array to be sent later. * This is being used in normal cases when header and content are * being sent. * There is a new situation where the interestOps is blocking, till * the next connection comes in. Or i need to put a select timeout. * Thus i am timing this thing. */ public void setSendData(ByteBuffer[] data) { this.write_pending = true; wBuffers = data; if (debug){ System.out.println( id+") INSIDE setSendData BB[] "); enter_set_send = System.currentTimeMillis() ; System.out.println( " limit, pos "+ wBuffers[0].position()+ ","+ wBuffers[0].limit() ); System.out.println( " limit, pos "+ wBuffers[1].position()+ ","+ wBuffers[1].limit() ); } buffer_offset = data.length-1; //wBuffer = data; setWriteInterest(); //RK added on 20031214 13:47:25 if (debug){ exit_set_send = System.currentTimeMillis() ; System.out.println( id + ") EXITING setSendData BB[] in millis:" + (exit_set_send - enter_set_send)); } } /** Sets interest in writing, without overwriting current interests. * called from setSendData implementations. * //RK added on 20031214 13:47:25 so it would be easier for persons * overriding to set the interest. */ public void setWriteInterest(){ SelectionKey sk = sc.keyFor(selector); int ops = sk.interestOps(); sk.interestOps( ops | SelectionKey.OP_WRITE); selector.wakeup(); // is this required or not ??? } /** get the read buffer as a byte buffer */ public ByteBuffer getReadDataAsBuffer() { return rBuffer; } /** return the read string, converting from the StringBuffer. * This does not reinitialize the buffer, as that may make it * inefficient. * The new StringB line is redundant if called at end of read. * How to remove it ?? * In fact we are only calling it at complete stage, so we should * remove the new. */ public String getReadData () { return new String(rsb.toString()); } //String tmp = new String(rsb.toString()); //rsb = new StringBuffer(READ_BUFFER_SIZE); //return tmp; /** returns the read data as a string after flipping and clearing * the buffer. Called by recv(). * Is this method inefficient - YES. * Is there a quicker alternative for ascii text ? * If this is single threaded, we can move first 2 lines of try into * static block ? * Also can we avoid this conversion altogether - let user decide * what he wants to do with data ? * XXX Should clear go into a finally block ? */ public String getReadDataAsString() { rBuffer.flip(); String ret = null; try { Charset charset=Charset.forName("ISO-8859-1"); CharsetDecoder decoder = charset.newDecoder(); CharBuffer charBuffer = decoder.decode(rBuffer); ret = charBuffer.toString(); rBuffer.clear(); } catch (Exception exc) { System.err.println( "EXC:"+ exc.toString()); exc.printStackTrace(); } return ret; } /** terminator to scan for while taking a request (for servers). If * none, then no scanning will be done. Terminators can change * during a scan, so it is no longer a static */ //static String terminator = null; /** Set the terminator you would like to scan for, when accepting a * request on the socketChannel of a server socket. * In the case of an HTTP Get request, it would be "\r\n\r\n" for * end of header, and "\r\n" for within a header. */ //public static void setTerminator (String term){ terminator = term; } //public static String getTerminator (){ return terminator; } /** the terminator has been detected in an incoming request * (on an accepting channel). Usually the user, will use this * handle, to send a response back. * @param mdata data that has been received. */ public void handle_terminator(String mdata){ // RK modified on 20031008 23:35:18 System.err.println( "HT: NIO SHOULD DO NOTHIGH HERE. <<<----"); //setSendData("NIO OK.\r\n\r\n. Got {\r\n"+mdata+"\r\n}\r\n"); } /** a write has completed on an accepting channel. Currently, this * closes the channel. If you are maintaining a conversation, you * would override this and NOT close. * You may want to close the channel if its just a request/response. * You may wish to clear the read buffer if its a conversation. * Here's where rsb can be cleared along with rbb * Note: i wonder whether we should close the channel or not. */ public void handle_write_complete() { //System.out.println( "NIO please override handle_write_complete"); handle_close(); } /** how many connection resets did i get */ public static int crcount = 0; /** how many connection timeouts did i get */ public static int ctcount = 0; /** how many connection did i get */ public static int conncount = 0; protected static int connctr = 0; /** how many reads did i do */ public static int readcount = 0; /** how many writes did i do */ public static int writecount = 0; /** how many connections did i accept */ public static int acceptcount = 0; protected static int acceptctr = 0; /** report connects and accepts after every n connects. Current * value is 1000 */ public static int REPORT_AFTER=1; // temporarily added due to block in setSend private long exit_set_send; private long enter_set_send; /** Stop polling immediately - closes all sockets as fast as * possible * RK added on 20031008 20:58:03 on 1.1.8 * */ public static void stop_now(){ stopWhenIdle = true; synchronized (socketMap){ Iterator it = socketMap.values().iterator(); while (it.hasNext()){ NioSocket obj =(NioSocket) it.next(); obj.nio_close(); } // while socketMap.clear(); } // sync } // stop_now /** Clear the buffer (StringBuffer) used for reading. THis is * required in a conversation. I am not doing this normally, since * it would be inefficient in the case of single read/writes. * ARRGH ! pathetic but what can i do ? */ public void clear_read_buffer(){ rsb = new StringBuffer(READ_BUFFER_SIZE); } /** handler that handles connection time outs. */ public static void conn_time_out_handler(NioSocket mns, SelectionKey mkey){ System.err.print('X' ); // signified connection time out mns.nio_close(); mkey.cancel(); ctcount++; } /** handler that handles connection resets. Currently, this * - increments a counter * - cancels the key, awakens to selector * - closes the socket channel */ public void conn_reset_handler(){ //System.err.print('X' ); // signified connection time out crcount++; SelectionKey key = this.sc.keyFor(selector); key.cancel(); selector.wakeup(); this.nio_close(); } /** handler for when program exits due to Control-c */ public static void on_exit_handler(){ long endmillis = System.currentTimeMillis(); System.out.println( ); System.out.print( " -- Bye. " ); System.out.println( new java.util.Date() ); System.out.println( "Total Sockets:"+ counter + " Active currently:"+ activeSocketCount + " Peak Active:" + peakactive); System.out.println( "Accepts:"+ acceptcount + " Reads:"+ readcount); System.out.println( "Connects:"+ conncount + " Writes:"+ writecount); System.out.println( "CONNECT RESET:"+ crcount + " CONN TIMED OUT:"+ ctcount); System.out.println( ); System.out.println( "-- Program Started --"); System.out.print( "Millis:" + startmillis); System.out.println( " " + startdate ); System.out.println( "-- Program Ended --"); System.out.print( "Millis:" + endmillis); System.out.println(" "+ new java.util.Date()); System.out.println( "Diff seconds:" + ((endmillis - startmillis)/1000)); System.out.println( ); } /** Logic for deciding if a read has completed, and processing may * be invoked, or channel can be closed, handle_term can be called. * Default if false. One implementation could be: *
     * if (terminator != null)
     *    return (rsb.indexOf(terminator) > -1);
     * return false;
     * 
*/ public boolean is_read_complete(){ return false; } /** Handle data that is being read. * The default implementation:
        if (is_read_complete() ) handle_read_complete(getReadData() );
     
*
this was removed from read method so that it could be overriden easily. * */ public void handle_input (){ if (is_read_complete() ) handle_read_complete(getReadData() ); } } // end of class