Sample Code - SampleIncommingChannel.package

import org.apache.log4j.Category;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.ByteBuffer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.Iterator;
import com.intentia.ec.shared.IncommingChannel;
import com.intentia.ec.shared.Manifest;
import com.intentia.ec.shared.IThreadWorker;
import com.intentia.ec.shared.dbitems.ChannelProperty;
import com.intentia.ec.server.ThreadPool;

public class SampleIncommingChannel extends 
IncommingChannel {
	// A log4J category, used for logging.
public static Category cat = Category.getInstance
(SampleIncommingChannel.class.getName());
	// The port number. Set it to be 112233 as default.
private int myPort = 112233;
	// The read timeout. Set it to be 5000 msec as default.
private int readTimeout = 50000;
	// We are dealing with incoming sockets, hence we 
need a ServerSocketChannel.
private ServerSocketChannel serverChannel;
	// Since we are using java.nio we needs a Selector.
	private Selector connectSelector;

	public SampleIncommingChannel() {
		super(cat);
	}

	public boolean initiateChannel(ChannelProperty[] props) {
		// Start by extracting the property for the port value.
		String propKey = "Port";
		// Retrieve the correct ChannelProperty from the array
		ChannelProperty prop = getProperty(props, propKey);
		String value = getValue(prop);
		// If the property did not exist we return false.
		if (prop == null) return false;
		if (value == null) {
			// If value is null but mandatory we issue a warning and use the
			// implemented default value.

			if (prop.isMandatory()) {
				cat.warn("Using internal default value: " + myPort);
			}
		}
		else {
			try {
				int tmp = Integer.parseInt(value);
				if (tmp <= 0) {
					throw new IllegalArgumentException("Negative or zero value.");
				}
				myPort = tmp;
			}
			catch (Exception e) {
				cat.warn(propKey + " property for SampleIncommingChannel is 
          invalid. Using internal default value: " + myPort);
			}
		}
		// Do the same for the MaxSubThreads-property.
		propKey = "MaxSubThreads";
		prop = getProperty(props, propKey);
		value = getValue(prop);
		if (prop == null) return false;
		if (value == null) {
			if (prop.isMandatory()) {
				cat.warn("Using internal default value: " + maxSubThreads);
			}
		}
		else {
			try {
				int tmp = Integer.parseInt(value);
				if (tmp <= 0) {
					throw new IllegalArgumentException("Negative or zero value.");
				}
				maxSubThreads = tmp;
			}
			catch (Exception e) {
				cat.warn(propKey + " property for SampleIncommingChannel is 
          invalid. Using internal default value: " + maxSubThreads);
			}
		}

		// Finally the ReadTimeOut-property.
		propKey = "ReadTimeOut";
		prop = getProperty(props, propKey);
		value = getValue(prop);
		if (prop == null) return false;
		if (value == null) {
			if (prop.isMandatory()) {
				cat.warn("Using internal default value: " + readTimeout);
			}
		}
		else {
			try {
				int tmp = Integer.parseInt(value);
				if (tmp <= 0) {
					throw new IllegalArgumentException("Negative or zero value.");
				}
				readTimeout = tmp;
			}
			catch (Exception e) {
				cat.warn(propKey + " property for SampleIncommingChannel is invalid. 
           Using internal default value: " + readTimeout);
			}
		}
		// Since we are going to use sub threads we register this channel to the
		// MEC ThreadPool.
		ThreadPool.getInstance().register(getClass().getName(), maxSubThreads);

		// Everything went fine, therefore return true.
		return true;
	}

	public long getDelayTime() {
		return 0;
	}

	public void runChannel() {
		// Create the ServerSocketChannel object and register in 
      a java.nio fashion.
		if (serverChannel == null) {
			try {
				connectSelector = Selector.open();
				serverChannel = ServerSocketChannel.open();
				serverChannel.configureBlocking(false);
				InetSocketAddress address = new InetSocketAddress(myPort);
				serverChannel.socket().bind(address);
				serverChannel.register(connectSelector, SelectionKey.OP_ACCEPT);
				logDebug(getPlugInName() + " server started");
			}
			catch (IOException e) {
				logError("Error when inititating " + getPlugInName() + ". 
          Shutting down plug-in.", e);
				super.stopPlugIn();
				return;
			}
		}
		try {
			// Note, the connectSelector.select() will block until a
			// connection becomesavailable. That's why 
      //  getDelayTime() returns zero.
			if (connectSelector.select() > 0) {
				Set readyKeys = connectSelector.selectedKeys();
				for (Iterator i = readyKeys.iterator(); i.hasNext();) {
					SelectionKey key = (SelectionKey) i.next();
					i.remove();
					if (!key.isValid()) continue;
					if (key.isAcceptable()) {
						SocketChannel channel = null;
						try {
							channel = ((ServerSocketChannel) key.channel()).accept();
							channel.configureBlocking(false);
							channel.register(connectSelector, SelectionKey.OP_READ);
						}
						catch (IOException e) {
							logError("Error while accepting", e);
							try {
								channel.socket().shutdownOutput();
								channel.close();
							}
							catch (Exception ex) {
							}
						}
					}
					else if (key.isReadable()) {
						// The incoming connection is ready to be read. Start a
						// worker that will read the message.
						SampleSubThreadWorker worker = new SampleSubThreadWorker();
						worker.setKey(key);
						submitWork(worker);
					}
					else {
						logError("Undefined key interest.");
					}
				}
			}
		}
		catch (IOException e) {
			logError("Error when receiving message.", e);
		}
	}

	public void cleanUpChannel() {
		try {
			serverChannel.close();
		}
		catch (Exception e) {
		}
		try {
			connectSelector.close();
		}
		catch (Exception e) {
		}
	}

	private class SampleSubThreadWorker implements IThreadWorker {

		SelectionKey key;
		String type;
		private Manifest manifest;

		public Manifest getManifest() {
			return manifest;
		}

		public void setManifest(Manifest manifest) {
			this.manifest = manifest;
		}

		public String getType() {
			return type;
		}

		public void setType(String type) {
			this.type = type;
		}

		public void setKey(SelectionKey key) {
			this.key = key;
			key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
		}

		public void runWork() {
			ByteBuffer buffer = ByteBuffer.allocate(4);
			Manifest manifest = Manifest.getManifest();
			setManifest(manifest);
			SocketChannel channel = null;
			try {
				channel = (SocketChannel) key.channel();
				channel.socket().setSoTimeout(readTimeout);
				channel.read(buffer);
				buffer.flip();
				int length = buffer.asIntBuffer().get();
				persistMessage(channel, manifest, length, readTimeout);
				queueManifest(manifest);
			}
			catch (IOException e) {
				logError("Error while reading", e);
			}
			finally {
				try {
					channel.socket().shutdownOutput();
					channel.close();
				}
				catch (Exception ex) {
				}
				key.selector().wakeup();
				key = null;
			}
		}
	}
}