import org.apache.log4j.Category;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.ByteBuffer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.Iterator;
import com.intentia.ec.shared.IncommingChannel;
import com.intentia.ec.shared.Manifest;
import com.intentia.ec.shared.IThreadWorker;
import com.intentia.ec.shared.dbitems.ChannelProperty;
import com.intentia.ec.server.ThreadPool;
public class SampleIncommingChannel extends
IncommingChannel {
// A log4J category, used for logging.
public static Category cat = Category.getInstance
(SampleIncommingChannel.class.getName());
// The port number. Set it to be 112233 as default.
private int myPort = 112233;
// The read timeout. Set it to be 5000 msec as default.
private int readTimeout = 50000;
// We are dealing with incoming sockets, hence we
need a ServerSocketChannel.
private ServerSocketChannel serverChannel;
// Since we are using java.nio we needs a Selector.
private Selector connectSelector;
public SampleIncommingChannel() {
super(cat);
}
public boolean initiateChannel(ChannelProperty[] props) {
// Start by extracting the property for the port value.
String propKey = "Port";
// Retrieve the correct ChannelProperty from the array
ChannelProperty prop = getProperty(props, propKey);
String value = getValue(prop);
// If the property did not exist we return false.
if (prop == null) return false;
if (value == null) {
// If value is null but mandatory we issue a warning and use the
// implemented default value.
if (prop.isMandatory()) {
cat.warn("Using internal default value: " + myPort);
}
}
else {
try {
int tmp = Integer.parseInt(value);
if (tmp <= 0) {
throw new IllegalArgumentException("Negative or zero value.");
}
myPort = tmp;
}
catch (Exception e) {
cat.warn(propKey + " property for SampleIncommingChannel is
invalid. Using internal default value: " + myPort);
}
}
// Do the same for the MaxSubThreads-property.
propKey = "MaxSubThreads";
prop = getProperty(props, propKey);
value = getValue(prop);
if (prop == null) return false;
if (value == null) {
if (prop.isMandatory()) {
cat.warn("Using internal default value: " + maxSubThreads);
}
}
else {
try {
int tmp = Integer.parseInt(value);
if (tmp <= 0) {
throw new IllegalArgumentException("Negative or zero value.");
}
maxSubThreads = tmp;
}
catch (Exception e) {
cat.warn(propKey + " property for SampleIncommingChannel is
invalid. Using internal default value: " + maxSubThreads);
}
}
// Finally the ReadTimeOut-property.
propKey = "ReadTimeOut";
prop = getProperty(props, propKey);
value = getValue(prop);
if (prop == null) return false;
if (value == null) {
if (prop.isMandatory()) {
cat.warn("Using internal default value: " + readTimeout);
}
}
else {
try {
int tmp = Integer.parseInt(value);
if (tmp <= 0) {
throw new IllegalArgumentException("Negative or zero value.");
}
readTimeout = tmp;
}
catch (Exception e) {
cat.warn(propKey + " property for SampleIncommingChannel is invalid.
Using internal default value: " + readTimeout);
}
}
// Since we are going to use sub threads we register this channel to the
// MEC ThreadPool.
ThreadPool.getInstance().register(getClass().getName(), maxSubThreads);
// Everything went fine, therefore return true.
return true;
}
public long getDelayTime() {
return 0;
}
public void runChannel() {
// Create the ServerSocketChannel object and register in
a java.nio fashion.
if (serverChannel == null) {
try {
connectSelector = Selector.open();
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
InetSocketAddress address = new InetSocketAddress(myPort);
serverChannel.socket().bind(address);
serverChannel.register(connectSelector, SelectionKey.OP_ACCEPT);
logDebug(getPlugInName() + " server started");
}
catch (IOException e) {
logError("Error when inititating " + getPlugInName() + ".
Shutting down plug-in.", e);
super.stopPlugIn();
return;
}
}
try {
// Note, the connectSelector.select() will block until a
// connection becomesavailable. That's why
// getDelayTime() returns zero.
if (connectSelector.select() > 0) {
Set readyKeys = connectSelector.selectedKeys();
for (Iterator i = readyKeys.iterator(); i.hasNext();) {
SelectionKey key = (SelectionKey) i.next();
i.remove();
if (!key.isValid()) continue;
if (key.isAcceptable()) {
SocketChannel channel = null;
try {
channel = ((ServerSocketChannel) key.channel()).accept();
channel.configureBlocking(false);
channel.register(connectSelector, SelectionKey.OP_READ);
}
catch (IOException e) {
logError("Error while accepting", e);
try {
channel.socket().shutdownOutput();
channel.close();
}
catch (Exception ex) {
}
}
}
else if (key.isReadable()) {
// The incoming connection is ready to be read. Start a
// worker that will read the message.
SampleSubThreadWorker worker = new SampleSubThreadWorker();
worker.setKey(key);
submitWork(worker);
}
else {
logError("Undefined key interest.");
}
}
}
}
catch (IOException e) {
logError("Error when receiving message.", e);
}
}
public void cleanUpChannel() {
try {
serverChannel.close();
}
catch (Exception e) {
}
try {
connectSelector.close();
}
catch (Exception e) {
}
}
private class SampleSubThreadWorker implements IThreadWorker {
SelectionKey key;
String type;
private Manifest manifest;
public Manifest getManifest() {
return manifest;
}
public void setManifest(Manifest manifest) {
this.manifest = manifest;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public void setKey(SelectionKey key) {
this.key = key;
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
}
public void runWork() {
ByteBuffer buffer = ByteBuffer.allocate(4);
Manifest manifest = Manifest.getManifest();
setManifest(manifest);
SocketChannel channel = null;
try {
channel = (SocketChannel) key.channel();
channel.socket().setSoTimeout(readTimeout);
channel.read(buffer);
buffer.flip();
int length = buffer.asIntBuffer().get();
persistMessage(channel, manifest, length, readTimeout);
queueManifest(manifest);
}
catch (IOException e) {
logError("Error while reading", e);
}
finally {
try {
channel.socket().shutdownOutput();
channel.close();
}
catch (Exception ex) {
}
key.selector().wakeup();
key = null;
}
}
}
}