/*
* $Author: rahul_kumar $
* $Id: NioSocket.java,v 1.22 2003/10/15 17:55:42 rahul_kumar Exp $
*
* Copyright (c) Rahul Kumar, 2003. All rights reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
* The latest copy of this software may be found on http://jniosocket.sourceforge.net/
*
*/
package raining.core;
import java.util.*;
import java.io.*;
import java.nio.*;
import java.net.*;
import java.nio.channels.*;
import java.nio.charset.*;
/**
* A Java 1.4.2 NIO-based non-blocking I/O framework, with buffered
* input and output.
* This abstracts the NIO, so that a user may extend this class and only
* override some events. It also tries to be as functional as possible
* so that a user can get away with as little overriding as
* possible.
* Many fields are public - deliberately so that applications arent
* slowed down using a get/set method. Anyone doing NIO work would be
* sensible enough to use these values sensibly :)
* Some event handles could have passed data to user, but due to tricky
* buffer handling, i didnt do that.
*
* I may extend this class, and move some code down to extended classes,
* as there can be several usages of an NIO socket.
*
* DONE: allow gathering writes (setSendData to take ByteBuffer array.
* I then need to change send() and check for the highest
* array index having some data in it.
* output buffering not done - DONE
* I shd move complete handles to an extender.
* also handle terminate
*
* This class tries to give behavioir for both a server and a client.
* Maybe we should remove the server assumptions and put them into the
* Server class.
*
* Note: pls do NOT compile with versions less than 1.4.2.
* --------------------------------------------------------------------
* Date Version Change
* (over which change was made)
* 2003-10-12 1.1.9: i have removed calls to handle_terminator, this can
* be handled by a server, or even our generic server.
* Added is_read_complete
* 2003-12-05 1.1.9: Added setSendData (ByteBuffer) for those who have
* read data in as a BB.
* 2003-12-06 1.1.10 subsctituting single BB with BB array for gathering
* writes
* --------------------------------------------------------------------
*/
public class NioSocket {
static String RCS_ID = "$Id: NioSocket.java,v 1.22 2003/10/15 17:55:42 rahul_kumar Exp $";
/** print debug statements or not. Since NIO can be tricky, we need
* debug statements for when things suddenly stop working. However,
* for a tested application, you may want to change this to final,
* and recompile.
*/
public static boolean debug = false;
public static void main (String args[]){
try {
NioSocket nios = new NioSocket ();
nios.create_server_socket(8080);
NioSocket nio = new NioSocket ();
//nio.create_client_socket("www.google.com",80,"GET / HTTP/1.0\r\n\r\n");
nio.create_client_socket("127.0.0.1",80,"GET /examples/jsp/index.html HTTP/1.0\r\n\r\n");
NioSocket.start();
} catch (Exception exc) { System.err.println( "32 EXC:"+ exc.toString()); exc.printStackTrace(); }
}
// TODO:
// The static stuff needs to be moved to a class.
// The NioSockets would be added to the class
//
public static Selector selector = null;
/** used for identifier of each socket */
static long counter = 0;
/** how many active sockets in system */
public static int activeSocketCount = 0;
/** maximum number of active sockets during program lifetime */
static int peakactive = 0;
/** when did the system start. */
static long startmillis = 0;
static java.util.Date startdate = null;
static {
System.out.println( " " );
System.out.println( " +--------------------------------------+");
System.out.println( " | RAINING SOCKETS V 1.2pre3c |");
System.out.println( " | Java 1.4 Non Blocking IO Framework |");
System.out.println( " |......................................|");
System.out.println( " | (c) Rahul Kumar, Sep 2003. |");
System.out.println( " |......................................|");
System.out.println( " | Status: Development-beta. |");
System.out.println( " | Functional, tested to 10K,15K conns|");
System.out.println( " +--------------------------------------+");
System.out.println( " " );
System.out.println( new java.util.Date() );
try {
selector = Selector.open();
} catch (Exception exc) { System.err.println( "Opening a selector L46 EXC:"+ exc.toString()); exc.printStackTrace(); }
}
/** this contains a mapping of channel and NioSocket object
* */
static Map socketMap = new HashMap();
static void s_read (NioSocket ns)
// throws java.io.IOException
{
ns.handle_read_event();
}
static void s_write (NioSocket ns)
//throws java.io.IOException
{
ns.handle_write_event();
}
static void s_accept (NioSocket ns) throws java.io.IOException {
ns.handle_accept();
}
static void s_connect (NioSocket ns) throws java.io.IOException {
ns.handle_connect();
}
// readwrite
/** method that polls the selector/ descriptor set.
* acco to unix man page on select, we should not
* use a timeout on a select operation since that wd block.
* TODO: throwing IO exceptions up means that if one exception
* comes, then we come right outa the loop. BAD
*/
static void poll (long timeout) throws IOException{
//if (timeout < 0) timeout = 0; // remove this !!!
if (socketMap.isEmpty())
return;
int n = 0;
try {
n = selector.select(timeout);
} catch (Exception exc) {
//System.err.println( "Select error 131 EXC:"+ exc.toString());
//exc.printStackTrace();
// control C
on_exit_handler();
stop_now();
System.exit(0);
}
if (n == 0){
return;
}
Iterator it = selector.selectedKeys().iterator();
while (it.hasNext()){
SelectionKey key = (SelectionKey) it.next();
// RK modified on 20031006 13:17:06 checks added since i am
// getting a null on line 226 in connect.
// if the next line DOESNT COME, you may remove the isValid
// check to speed up.
if (key == null || (!key.isValid()) ){
System.err.println( "Nio: loop: 198. got a null key.");
continue;
}
NioSocket ns = (NioSocket) socketMap.get(key.channel());
// RK modified on 20031006 13:17:06 checks added since i am
// getting a null on line 226 in connect.
if (ns == null){
//System.err.println( "Nio: loop: 209. got a null NioSocket "+key.readyOps());
//System.err.println( "+++: attach:"+ key.attachment());
ns = (NioSocket)key.attachment();
if (ns != null) {
//System.err.println( "-id: "+ ns.id);
socketMap.put(key.channel(), ns);
//ns = ss;
} else
continue;
} // ns null
if (debug)
System.out.println( "Found a channel, key:"+ns.id);
if (key.isAcceptable()) {
if (debug)
System.out.println( " -- acceptable ");
s_accept (ns);
}
if (key.isConnectable()) {
SocketChannel sock = (SocketChannel)ns.channel();
if (debug){
System.out.println( " -- connectable ");
}
//System.out.println( " -- connect pending: "+sock.isConnectionPending());
// RK modified on 20031006 13:45:46
// added a try here. hope it doesnt slow down the app
// this is where CONNECTION TIME OUTS are happening
try {
sock.finishConnect();
} catch (Exception exc) {
//System.err.println( " Nio finish connect 249 EXC:"+ exc.toString());
//System.err.println( " Lost Id: "+ns.id);
conn_time_out_handler(ns, key);
continue;
//exc.printStackTrace();
}
if (sock.isConnected()){
// write after you are connected
// RK modified on 20031008 23:14:43 on 1.1.8
// NOT UNTILL YOU HAVE SOMETHIGN TO WRITE
//Object o = key.attachment();
//SelectionKey sk = registerChannel(selector, ns.channel(), SelectionKey.OP_WRITE);
//sk.attach(o);
//THE ABOVE MAY BE NEEDED BACK - HOWEVER IT MAKES
//PROXY GO INTO A WRITABLE LOOP IF NOTHIGN TO WRITE
s_connect (ns);
}
}
if (key.isReadable()){
if (debug)
System.out.println( " -- readable "+ns.id);
s_read (ns);
}
else
if (key.isWritable()){
if (debug)
System.out.println( " -- writable "+ns.id);
s_write (ns);
}
it.remove();
} // while
} // poll
/** should the select() loop stop after doing given job.
* Default is false, since we will keep sending requests.
*/
public static boolean stopWhenIdle = false;
/** stop the server after pending jobs are complete.
*/
public static void stopWhenIdle (boolean flag){
stopWhenIdle = flag;
}
public static boolean stopWhenIdle (){
return stopWhenIdle;
}
/** start the select() loop, with no timeout */
public static void start() throws IOException{
start(0);
}
/** start the select() loop, with given timeout
* */
public static void start(long timeout) throws IOException{
System.err.println( "-- Started --");
startmillis = System.currentTimeMillis();
startdate = new java.util.Date();
System.err.println( "Millis:" + startmillis );
System.err.println( startdate );
while (true){
if (stopWhenIdle){
if (socketMap.isEmpty()){
on_exit_handler();
break;
}
}
// RK modified on 20031006 13:21:17 added try here so we
// dont break out if theres an error!!
try {
poll(timeout);
} catch (Exception exc) {
//System.err.println( " Nio: start: 321 EXC:"+ exc.toString());
//exc.printStackTrace();
System.err.print('?');
}
}
} // start
/** register the channel with the given operation and return the
* selection key, so that the user may attach something to the key.
* Note: is the wakeup required at all.
* Isnt it wasteful to config the channel to be non-blocking again
* and again.
*/
static SelectionKey registerChannel (Selector selector, SelectableChannel channel, int ops)
//throws IOException
{
if (channel == null){
return null;
}
SelectionKey sk = null;
try {
channel.configureBlocking(false);
sk = channel.register( selector, ops);
// XXX
selector.wakeup();
} catch (Exception e) {}
return sk;
}
/** Current count of active sockets. Usually you would just access
* activeSocketCount.
*/
public static int socketCount(){
// could return activeSocketCount;
return socketMap.size();
}
/** Reset the counter used for sockets - this is the id field, used
* mainly in debug statements only.
* */
public static void resetSocketCounter( long resetvalue){
counter = resetvalue;
}
/** May be overriden to handle events related to changing counts.
* Naah ! i need to pass an event handler.
*/
public static void socketCountChangeEvent( int count, int event, long id){
// may be overriden to maintain count related things
}
// temporarily for sake of blaster. i am running outa memory
/** size of read buffer for each NIO socket.
* Preferable to keep it
* really low like 128 if doing load testing, as in socketblaster.
* Increase it when writing a pipelined application, if you expect
* many requests coming together.
*/
public static int READ_BUFFER_SIZE = 256;
/** this is in disuse since applications are sending in bytebuffers.
*/
public static int WRITE_BUFFER_SIZE = 4096;
/** default number of indices in the write bb.
* Set to 2 for a non-pipelined socket.
* Set higher for pipelined, depending on how many embedded parts
* a document has.
* RK added on 20040219 18:03:37
* I changed from 2 to 1. SocketBlaster was NPE'ing, even though i
* was setting there. perhaps i should nt set at all.
*/
public static int DEFAULT_BB_SIZE = 1;
/** Set the read buffer size - high for a client, low for a server.
* not needed, but added just so that people know that they can set
* this to a sensible value, based on application needs.
*/
public static void setReadBufferSize(int rbs){
READ_BUFFER_SIZE = rbs;
}
/** set the write buffer size - keep high for a server, low for a client.
*/
public static void setWriteBufferSize(int wbs){
WRITE_BUFFER_SIZE = wbs;
}
/** set the number of indices to allocate for writing.
* Set high for MT apps sending data from concurrent processes (e.g.
* a document with many images, where the client has sent many
* file requests at the same time.
* Note that the BB would grow
* itself if required by doubling, so an average size can be kept.
* Based on the kind of content, you would decide how many GET
* requests you can get in one client request.
*/
public static void setByteBufferArrayLength(int size){
DEFAULT_BB_SIZE = size;
}
// instance variables INST VAR
ByteBuffer rBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE);
// is there any point in preallocating since i am assigning from a
// cache
// RK added on 20031206 23:04:53 subsctituting single with array
//ByteBuffer wBuffer = ByteBuffer.allocate(WRITE_BUFFER_SIZE);
//ByteBuffer[] wBuffers = new ByteBuffer[1];
// i am allowing client to allocate so they can decide its size.
// RK added on 20031214 13:57:3 so that pipelining sockets can set
// their size.
public ByteBuffer[] wBuffers = null;
protected int buffer_offset = 0;
/** use this to append content of rBuffer - reduce for servers. */
public StringBuffer rsb = new StringBuffer(256);
/** the channel itself */
SelectableChannel sc = null;
Socket socket = null;
boolean connected = false;
boolean accepting = false;
boolean read_pending = true;
// this is a hack till i find out more
boolean write_pending = false;
// another hack
// a channel created due to an accept or due to a connect.
// if accepting, then we dont close upon read zero
// if connecting then we do.
boolean accepting_channel = false;
boolean end_of_read = false;
boolean serverflag = false;
/** Identifier of socket, starting with zero */
public long id = 0;
/** accumulated number of bytes read across reads */
public int totalBytesRead = 0;
/** Empty constructor to create a non-blocking socketchannel. Use to
* create a server or client socketchannel.
*/
public NioSocket () throws IOException{
this(null);
}
/** constructor to create a non-blocking socket taking an existing
* socket. Usually called by a server socket to register its
* incoming sockets.
*/
public NioSocket (SelectableChannel sockch) throws IOException{
if (sockch != null){
this.sc = sockch;
add_to_map();
this.sc.configureBlocking(false);
this.connected = true;
this.accepting_channel = true;
// this has not effect
//((SocketChannel)sockch).socket().setKeepAlive(false);
} else
this.sc = null;
this.id = NioSocket.counter++;
if (debug)
System.out.println( "Created id: "+ this.id);
}
/** Returns whether this is a server socket. */
public boolean serverflag () {
return serverflag;
}
public static final int DEL_EVENT = -1;
public static final int ADD_EVENT = 1;
protected void add_to_map () {
synchronized (socketMap) {
socketMap.put(this.sc, this);
}
activeSocketCount++;
if (activeSocketCount > peakactive) peakactive = activeSocketCount;
// deleted this since this could slow things down
//socketCountChangeEvent(activeSocketCount, ADD_EVENT, this.id);
}
/** delete socket from our map.
* How about removing from selector keys??
*/
protected void del_from_map () {
synchronized (socketMap) {
Object o = socketMap.remove(this.sc);
if (o!=null)
activeSocketCount--;
else{
return;
}
} // synch
//activeSocketCount--;
//socketCountChangeEvent(activeSocketCount, DEL_EVENT, this.id);
}
/** create a server socket given a port. All server apps will call
* this at startup.
*/
public void create_server_socket(int port) throws IOException, SocketException {
this.sc = ServerSocketChannel.open();
ServerSocket serverSocket = ((ServerSocketChannel)sc).socket();
this.sc.configureBlocking(false);
serverSocket.setReuseAddress( true );
serverSocket.bind (new InetSocketAddress( port ));
this.accepting = true;
this.read_pending = false;
this.serverflag = true;
SelectionKey sk = this.sc.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(this); // NEW RK
this.add_to_map();
//if (debug)
System.out.println( "Created Server Socket on:"+ port);
}
/** create a client socket on the given host and port, and push the
* given data to it immediately. All client programs will call this,
* usually many times.
*/
public void create_client_socket(String host, int port, String path) throws IOException {
create_client_socket(host, port);
if (path != null)
setSendData(path);
}
/** create a client socket on the given host and port. In this case
* the developer would send data in the handle_connect event using
* setSendData.
*/
public void create_client_socket(String host, int port) throws IOException {
SocketChannel schannel = SocketChannel.open();
schannel.configureBlocking(false);
schannel.connect (new java.net.InetSocketAddress(host, port));
this.sc = schannel;
this.connected = false;
this.serverflag = false;
SelectionKey sk = schannel.register(selector, SelectionKey.OP_CONNECT);
//sk.attach(new Long(counter));
sk.attach( this );
if (debug)
System.out.println( "created client socket "+this.id+" host:"+host );
this.add_to_map();
wBuffers = new ByteBuffer[DEFAULT_BB_SIZE];
}
/** close the socket channel object. */
public void nio_close () {
try {
this.sc.close();
} catch (Exception exc) { System.err.println( "NIO close() 430EXC:"+ exc.toString()); exc.printStackTrace(); }
del_from_map();
}
/** determine if this socket is readable. Was earlier used in a loop
* that changed interest sets, but now seems to be out of fashion.
* May be extended.*/
public boolean readable (){
//return wBuffer.hasRemaining(); // even this should work
if (!((SocketChannel)this.sc).isConnected())
return false;
return this.read_pending;
}
/** determine if this socket is writable. Now unused like earlier
* method. May be extended. */
public boolean writeable (){
// System.out.println( " remaining:"+ wBuffer.remaining());
//return ( (wBuffer.remaining() < 4096)? true: false);
if (!((SocketChannel)this.sc).isConnected())
return false;
if (!write_pending) return false;
// TODO not sure about the next line at all
return ( (wBuffers[buffer_offset].remaining() > 0)? true: false);
}
protected void handle_read_event(){
handle_read();
}
protected void handle_write_event(){
handle_write();
}
/** reads data from the socketchannel
* returns the number of bytes read.
*
* if an exception is thrown, i should still call the complete
* method. also we should close the socket on -1, or does that only
* mean that readig is over, and we can start writing ?
* Should this be final ??
*/
public int recv () throws IOException {
// XXX Shd this be in a loop, just in case more data came
// through, during this read.
int count = 0, cnt=0;
while ((cnt = ((SocketChannel)this.sc).read( rBuffer )) >0){
count += cnt;
if (cnt == READ_BUFFER_SIZE){
rsb.append( getReadDataAsString());
}
}
if (rBuffer.hasRemaining()){
rsb.append( getReadDataAsString());
}
//cnt = ((SocketChannel)this.sc).read( rBuffer );
//count = cnt;
// XXX should i keep moving to a StringBuffer
// i dont know when the overriding method will flip it
totalBytesRead += count;
if (debug)
System.out.println( id+")=>>>>> RECV: got total:"+totalBytesRead+" this total:" +count+" cnt:"+cnt);
if ( cnt < 0 ){
// the next line can be dangerous since the protocol
// analysis being done in HttpClient will be ignored.
// DID i mean a close is dangerous or a
// handle_read_complete?
//handle_read_complete(getReadData());
rBuffer.clear();
this.read_pending = false;
this.end_of_read = true; // say that -1 reached on read
handle_close(); // i hope this doesn't make life miserable for user
return (count);
} else
{
this.read_pending = true;
registerChannel(selector, channel(), SelectionKey.OP_READ);
}
return count;
}
/** read handler.
* may be overriden.
* i have changed void to int, so this one can be invoked.
*/
public int handle_read(){
int count = -2; // if recv throws, then -2 can be used.
try {
count = recv();
if (count>0) readcount++;
} catch (IOException exc) {
System.err.println( " Nio h_r 610 EXC:"+ exc.toString());
System.err.println(" rsb:"+rsb);
// usually a connection reset by peer comes here, and the // data is blank.
conn_reset_handler();
return count; // added RK modified on 20031011 15:54:09
} catch (Exception exc) { // added RK modified on 20031012
System.err.println( " Nio h_r 611 EXC:"+ exc.toString());
exc.printStackTrace();
// HEY SHOULDnt i close here!! XXX
return count;
}
if (count >0) {
if (debug)
System.out.println( " ==>>>-- NIO h_r --:"+count+" id:" + this.id);
// RK modified on 20031012 13:55:31: added is_read_complete
// RK added on 20031214 14:47:43 moved out so we can
// override easily.
handle_input();
} else if (count == 0) {
rBuffer.clear(); // this shd be in recv
}
return count;
}
/** called when the read completes (i.e.we get a -1 or disconnect
* on the socket.
* this can take a long time after data has arrived.
* to be investigated.
* You may call this from handle_terminator when you implement a
* protocol.
*/
public void handle_read_complete(String mdata){
System.out.println( " ==>>> NIO hrc got ("+totalBytesRead+"):"+ mdata );
}
/** convenience method for an extender wanting to trigger the
* complete data from handle_read, but not knowing where the data
* parameter comes from.
*/
public void handle_read_complete(){
handle_read_complete(getReadData());
}
/** Send the data to the channel, returning the number of bytes
* written.
* This does the actual write, and
* usually you would not override this.
*/
public int send () throws IOException {
int count = 0, cnt = 0;
if (!write_pending) {
return 0;
}
while (wBuffers[buffer_offset].hasRemaining())
{
cnt = (int) ((SocketChannel)this.sc).write(wBuffers);
count += cnt;
}
if (wBuffers[buffer_offset].remaining()==0) {
read_pending = true;
registerChannel(selector, channel(), SelectionKey.OP_READ);
write_pending = false; // what if there's still more in buffer
}
if (debug)
System.out.println( id+" Written total:"+count+ " cnt="+cnt);
return count;
}
/** handler that handles write. This calls send().
* if nothing more to write on an accepting channel then close
* (by calling write_complete).
* User would override this if he wants to keep a conversation.
*/
public void handle_write(){
try {
int count = send();
if (count>0) writecount++; // can happen many times on one channel
} catch (Exception exc) { System.err.println( "590 EXC:"+ exc.toString()); exc.printStackTrace(); }
// if nothing more to write on an accpetig channel then close
// user would override this is he wants to keep a conversation.
if (accepting_channel){
if (!writeable()){
handle_write_complete();
}
}
}
/** Accepts a connection from a server socket. Usually called from
* handle_accept. Returns the SocketChannel recved by server socket.
* This registers the new channel for a read.
* RK modified on 20031012 13:31:08
* I would prefer to read off whats coming in straight off since its
* likely to finish in one operation, than
* register for a read. But pushing the register into the handle
* makes it difficult for the user.
*/
public SocketChannel accept (){
ServerSocketChannel server = (ServerSocketChannel)this.sc;
SocketChannel schannel = null;
try {
schannel = server.accept();
//schannel = server.accept().getChannel(); // changed after compilation with
// 1.4.0. this never gave an error with 1.4.2
SelectionKey sk = registerChannel(selector, schannel, SelectionKey.OP_READ);
// RK remarked on 20031012 11:35:56: attach this or the new
// object created in handle ??? XXX
sk.attach(this);
} catch (Exception exc) { System.err.println( "Nio accept 661 EXC:"+ exc.toString()); exc.printStackTrace(); }
this.read_pending = true;
return schannel;
}
/** handle for when a serve gets a connection.
* Earlier this was handled up above in the select loop, which makes
* Servers simpler. However, a user lost flexibility, thus this has
* been moved down
* to the handle. This makes it tougher for a user, but he can reject a
* connection, or process it at will.
* Usually a user, will do an accept here, and recieve the incoming
* socket. He will register it for reading:
* Note that you can replace line 2 with:
* 2. MyNioSocket servsock = new MyNioSocket (schannel, null);
* in case you have extended NioSoket. This is why this code is
* now here.
*
* 1. SocketChannel schannel = this.accept();
* 2. NioSocket servsock = new NioSocket (schannel);
* OR MyNioSocker servsock = new MyNioSocket (schannel);
*
*/
public void handle_accept(){
SocketChannel schannel = null;
NioSocket servsock = null;
try {
schannel = this.accept();
servsock = new NioSocket (schannel);
// RK modified on 20031012 13:30:04: added since a
// connecting channel can be read from immediately. Usually
// there will be a tiny request that can be handled in one
// shot.
servsock.handle_read();
} catch (Exception exc) { System.err.println( "Nio handle accept 675 EXC:"+ exc.toString()); exc.printStackTrace(); }
accept_accounting();
}
/** sum up the number of sockets that have been accepted.
* Returns the current count.
* RK modified on 20031011 15:21:06: moved out of handle to a
* separate method. */
public int accept_accounting(){
acceptcount++;
if (++acceptctr==REPORT_AFTER){
acceptctr=0;
System.err.println( "A:"+acceptcount+":"+NioSocket.counter+":"+activeSocketCount);
}
return acceptcount;
}
/** Connect handler, called when a client socket has connected. You
* may want to send a string after connecting, if you created the
* socket without the data parameter. Currently just contains a call
* to send().
*/
public void handle_connect(){
connect_accounting();
try {
send();
} catch (Exception exc) { System.err.println( "EXC:"+ exc.toString()); exc.printStackTrace(); }
}
/** keeps totals of how many connects happened.
* Returns the current count.
* RK modified on 20031011 15:21:06: moved out of handle to a
* separate method. */
public int connect_accounting(){
conncount++;
if (++connctr==REPORT_AFTER){
connctr=0;
System.err.println("+++ Conn:"+conncount);
}
return conncount;
}
/** called when a socket object is to be closed. Currently calls the
* close of this object.
*/
public void handle_close(){ this.nio_close(); }
/** returns the selectable channel of this object. */
public SelectableChannel channel(){ return sc; }
public static void setDebug (boolean flag){ debug = flag; }
/** Set the given data to be sent later.
* This should only be used if the caller received the data as a String.
* If you received the data as a byte[], use the relevant method.
* We cant put a write
* operation since it will negate the read that may be happening. We
* can *add* a write interest. Otherwise i see nowhere else from where
* a WRITE interest is being added. Conversational programs can have
* a problem writing.
*/
public void setSendData(String data) {
if (debug) System.out.println( "Nio 837 setting:" + data);
setSendData(data.getBytes());
}
/** Set the given data to be sent later.
* This method was added on
* 2003-10-29 since we would usually be reading from a file or a
* source in bytes.
*/
public void setSendData(byte[] data) {
setSendData(ByteBuffer.wrap(data));
}
/** Set the given bytebuffer to be sent later.
* This would be used for sending just the header in response to a
* HEAD request, and to send off error responses where there is no
* content.
*
*/
public void setSendData(ByteBuffer data) {
if (debug)
System.out.println( "iNSIDE setSendData BB:"+ id);
this.write_pending = true;
wBuffers[0] = data;
buffer_offset = 0;
//wBuffer = data;
setWriteInterest(); //RK added on 20031214 13:47:25
if (debug)
System.out.println( "EXITING setSendData BB"+ id);
}
/** Set the given bytebuffer array to be sent later.
* This is being used in normal cases when header and content are
* being sent.
* There is a new situation where the interestOps is blocking, till
* the next connection comes in. Or i need to put a select timeout.
* Thus i am timing this thing.
*/
public void setSendData(ByteBuffer[] data) {
this.write_pending = true;
wBuffers = data;
if (debug){
System.out.println( id+") INSIDE setSendData BB[] ");
enter_set_send = System.currentTimeMillis() ;
System.out.println( " limit, pos "+ wBuffers[0].position()+ ","+ wBuffers[0].limit() );
System.out.println( " limit, pos "+ wBuffers[1].position()+ ","+ wBuffers[1].limit() );
}
buffer_offset = data.length-1;
//wBuffer = data;
setWriteInterest(); //RK added on 20031214 13:47:25
if (debug){
exit_set_send = System.currentTimeMillis() ;
System.out.println( id + ") EXITING setSendData BB[] in millis:" + (exit_set_send - enter_set_send));
}
}
/** Sets interest in writing, without overwriting current interests.
* called from setSendData implementations.
* //RK added on 20031214 13:47:25 so it would be easier for persons
* overriding to set the interest.
*/
public void setWriteInterest(){
SelectionKey sk = sc.keyFor(selector);
int ops = sk.interestOps();
sk.interestOps( ops | SelectionKey.OP_WRITE);
selector.wakeup(); // is this required or not ???
}
/** get the read buffer as a byte buffer */
public ByteBuffer getReadDataAsBuffer() { return rBuffer; }
/** return the read string, converting from the StringBuffer.
* This does not reinitialize the buffer, as that may make it
* inefficient.
* The new StringB line is redundant if called at end of read.
* How to remove it ??
* In fact we are only calling it at complete stage, so we should
* remove the new.
*/
public String getReadData () { return new String(rsb.toString()); }
//String tmp = new String(rsb.toString());
//rsb = new StringBuffer(READ_BUFFER_SIZE);
//return tmp;
/** returns the read data as a string after flipping and clearing
* the buffer. Called by recv().
* Is this method inefficient - YES.
* Is there a quicker alternative for ascii text ?
* If this is single threaded, we can move first 2 lines of try into
* static block ?
* Also can we avoid this conversion altogether - let user decide
* what he wants to do with data ?
* XXX Should clear go into a finally block ?
*/
public String getReadDataAsString() {
rBuffer.flip();
String ret = null;
try {
Charset charset=Charset.forName("ISO-8859-1");
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = decoder.decode(rBuffer);
ret = charBuffer.toString();
rBuffer.clear();
} catch (Exception exc) { System.err.println( "EXC:"+ exc.toString()); exc.printStackTrace(); }
return ret;
}
/** terminator to scan for while taking a request (for servers). If
* none, then no scanning will be done. Terminators can change
* during a scan, so it is no longer a static
*/
//static String terminator = null;
/** Set the terminator you would like to scan for, when accepting a
* request on the socketChannel of a server socket.
* In the case of an HTTP Get request, it would be "\r\n\r\n" for
* end of header, and "\r\n" for within a header.
*/
//public static void setTerminator (String term){ terminator = term; }
//public static String getTerminator (){ return terminator; }
/** the terminator has been detected in an incoming request
* (on an accepting channel). Usually the user, will use this
* handle, to send a response back.
* @param mdata data that has been received.
*/
public void handle_terminator(String mdata){
// RK modified on 20031008 23:35:18
System.err.println( "HT: NIO SHOULD DO NOTHIGH HERE. <<<----");
//setSendData("NIO OK.\r\n\r\n. Got {\r\n"+mdata+"\r\n}\r\n");
}
/** a write has completed on an accepting channel. Currently, this
* closes the channel. If you are maintaining a conversation, you
* would override this and NOT close.
* You may want to close the channel if its just a request/response.
* You may wish to clear the read buffer if its a conversation.
* Here's where rsb can be cleared along with rbb
* Note: i wonder whether we should close the channel or not.
*/
public void handle_write_complete()
{
//System.out.println( "NIO please override handle_write_complete");
handle_close();
}
/** how many connection resets did i get */
public static int crcount = 0;
/** how many connection timeouts did i get */
public static int ctcount = 0;
/** how many connection did i get */
public static int conncount = 0;
protected static int connctr = 0;
/** how many reads did i do */
public static int readcount = 0;
/** how many writes did i do */
public static int writecount = 0;
/** how many connections did i accept */
public static int acceptcount = 0;
protected static int acceptctr = 0;
/** report connects and accepts after every n connects. Current
* value is 1000 */
public static int REPORT_AFTER=1;
// temporarily added due to block in setSend
private long exit_set_send;
private long enter_set_send;
/** Stop polling immediately - closes all sockets as fast as
* possible
* RK added on 20031008 20:58:03 on 1.1.8
* */
public static void stop_now(){
stopWhenIdle = true;
synchronized (socketMap){
Iterator it = socketMap.values().iterator();
while (it.hasNext()){
NioSocket obj =(NioSocket) it.next();
obj.nio_close();
} // while
socketMap.clear();
} // sync
} // stop_now
/** Clear the buffer (StringBuffer) used for reading. THis is
* required in a conversation. I am not doing this normally, since
* it would be inefficient in the case of single read/writes.
* ARRGH ! pathetic but what can i do ? */
public void clear_read_buffer(){
rsb = new StringBuffer(READ_BUFFER_SIZE);
}
/** handler that handles connection time outs.
*/
public static void conn_time_out_handler(NioSocket mns, SelectionKey mkey){
System.err.print('X' ); // signified connection time out
mns.nio_close();
mkey.cancel();
ctcount++;
}
/** handler that handles connection resets. Currently, this
* - increments a counter
* - cancels the key, awakens to selector
* - closes the socket channel
*/
public void conn_reset_handler(){
//System.err.print('X' ); // signified connection time out
crcount++;
SelectionKey key = this.sc.keyFor(selector);
key.cancel();
selector.wakeup();
this.nio_close();
}
/** handler for when program exits due to Control-c
*/
public static void on_exit_handler(){
long endmillis = System.currentTimeMillis();
System.out.println( );
System.out.print( " -- Bye. " );
System.out.println( new java.util.Date() );
System.out.println( "Total Sockets:"+ counter + " Active currently:"+ activeSocketCount + " Peak Active:" + peakactive);
System.out.println( "Accepts:"+ acceptcount + " Reads:"+ readcount);
System.out.println( "Connects:"+ conncount + " Writes:"+ writecount);
System.out.println( "CONNECT RESET:"+ crcount + " CONN TIMED OUT:"+ ctcount);
System.out.println( );
System.out.println( "-- Program Started --");
System.out.print( "Millis:" + startmillis);
System.out.println( " " + startdate );
System.out.println( "-- Program Ended --");
System.out.print( "Millis:" + endmillis);
System.out.println(" "+ new java.util.Date());
System.out.println( "Diff seconds:" + ((endmillis - startmillis)/1000));
System.out.println( );
}
/** Logic for deciding if a read has completed, and processing may
* be invoked, or channel can be closed, handle_term can be called.
* Default if false. One implementation could be:
*
* if (terminator != null)
* return (rsb.indexOf(terminator) > -1);
* return false;
*
*/
public boolean is_read_complete(){ return false; }
/** Handle data that is being read.
* The default implementation:
if (is_read_complete() ) handle_read_complete(getReadData() );
*