Writing Custom Polling Channel with Retry Reconnect and Back-off
Writing Custom Polling Channel with Retry Reconnect and Back-off
-
Write an incoming communication channel class.
The class must extend another class called
com.infor.ec.communication.PollingChannel
. -
Implement these abstract methods in the super
class:
-
initiateChannel
-
getDelayTime
-
cleanUpChannel
-
-
Implement the
runChannel
method.Use these guidelines:
-
You must add the
persistMessage
andqueueManifest
methods -
Add these properties in the java code:
Retry Reconnect Properties
ConnectionRetryCount, RetryInterval
Back-off Properties
MaximumDelayTime, IncrementDelayTime
-
In your Java code, define how to pull data. See the example Java code for the
SamplePollinChannel.java
Custom Polling Channel.
Use this sample Java code:
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import org.apache.log4j.Category; import com.infor.ec.communication.PollingChannel; import com.intentia.ec.server.ThreadPool; import com.intentia.ec.shared.IThreadWorker; import com.intentia.ec.shared.Manifest; import com.intentia.ec.shared.dbitems.ChannelProperty; public class SamplePollinChannel extends PollingChannel { // A log4J category, used for logging. public static Category cat = Category .getInstance(SamplePollinChannel.class.getName()); // The port number. Set it to be 222222 as default. private int myPort = 222222; // We are dealing with incoming sockets, hence we need a // ServerSocketChannel. private ServerSocketChannel serverChannel; // Since we are using java.nio we needs a Selector. private Selector connectSelector; public SamplePollinChannel() { super(cat); } public boolean initiateChannel(ChannelProperty[] props) { // Start by extracting the property for the port value. String propKey = "Port"; // Retrieve the correct ChannelProperty from the array ChannelProperty prop = getProperty(props, propKey); String value = getValue(prop); // If the property did not exist we return false. if (prop == null) return false; if (value == null) { // If value is null but mandatory we issue a warning and use the // implemented default value. if (prop.isMandatory()) { cat.warn("Using internal default value: " + myPort); } } else { try { int tmp = Integer.parseInt(value); if (tmp <= 0) { throw new IllegalArgumentException( "Negative or zero value."); } myPort = tmp; } catch (Exception e) { cat.warn(propKey + " property for SamplePollinChannel is invalid. Using internal default value: " + myPort); } } // Do the same for the MaxSubThreads-property. propKey = "MaxSubThreads"; prop = getProperty(props, propKey); value = getValue(prop); if (prop == null) return false; if (value == null) { if (prop.isMandatory()) { cat.warn("Using internal default value: " + maxSubThreads); } } else { try { int tmp = Integer.parseInt(value); if (tmp <= 0) { throw new IllegalArgumentException( "Negative or zero value."); } maxSubThreads = tmp; } catch (Exception e) { cat.warn(propKey + " property for SamplePollinChannel is invalid. Using internal default value: " + maxSubThreads); } } // Finally the ReadTimeOut-property. propKey = "ReadTimeOut"; prop = getProperty(props, propKey); value = getValue(prop); if (prop == null) return false; if (value == null) { if (prop.isMandatory()) { cat.warn("Using internal default value: " + getReadtimeOut()); } } else { try { int tmp = Integer.parseInt(value); if (tmp <= 0) { throw new IllegalArgumentException( "Negative or zero value."); } setReadtimeOut(tmp); } catch (Exception e) { cat.warn(propKey + " property for SamplePollinChannel is invalid. Using internal default value: " + getReadtimeOut()); } } // Connection Retry when error occurs // Properties involved: ConnectionRetryCount, RetryInterval propKey = "ConnectionRetryCount"; prop = getProperty(props, propKey); value = getValue(prop); if (prop == null) return false; if (value == null) { if (prop.isMandatory()) return false; } else { try { int tmp = Integer.parseInt(value); if (tmp < 0) { logWarn("Using internal default value: " + getConnectionRetryCount()); throw new IllegalArgumentException("Negative or zero value."); } this.setConnectionRetryCount(tmp); } catch (Exception e) { logWarn(propKey + " property for SamplePollinChannel is invalid. Using internal default value: " + getConnectionRetryCount()); } } propKey = "RetryInterval"; prop = getProperty(props, propKey); value = getValue(prop); if (prop == null) return false; if (value == null) { if (prop.isMandatory()) { logWarn("Using internal default value: " + getRetryInterval()); } } else { try { long num = Long.parseLong(value); if (num <= 0) { throw new IllegalArgumentException("Negative or zero value."); } setRetryInterval(num); } catch (Exception e) { logWarn(propKey + " property for SamplePollinChannel is invalid. Using internal default value: " + getRetryInterval()); } } // Back-off feature // Properties involved: MaximumDelayTime, IncrementDelayTime propKey = "MaximumDelayTime"; prop = getProperty(props, propKey); value = getValue(prop); if (prop == null) return false; if (value == null) { if (prop.isMandatory()) { logWarn("Using internal default value: " + getMaximumDelayTime()); } } else { try { long num = Long.parseLong(value); if (num <= 0) { throw new IllegalArgumentException("Negative or zero value."); } setMaximumDelayTime(num); } catch (Exception e) { logWarn(propKey + " property for SamplePollinChannel is invalid. Using internal default value: " + getMaximumDelayTime()); } } propKey = "IncrementDelayTime"; prop = getProperty(props, propKey); value = getValue(prop); if (prop == null) return false; if (value == null) { if (prop.isMandatory()) { logWarn("Using internal default value: " + getIncrementDelayTime()); } } else { try { long num = Long.parseLong(value); if (num <= 0) { throw new IllegalArgumentException("Negative or zero value."); } setIncrementDelayTime(num); } catch (Exception e) { logWarn(propKey + " property for SamplePollinChannel is invalid. Using internal default value: " + getIncrementDelayTime()); } } // Since we are going to use sub threads we register this channel to the // MEC ThreadPool. ThreadPool.getInstance().register(getClass().getName(), maxSubThreads); // Everything went fine, therefore return true. return true; } public long getDelayTime() { return 0; } public void runChannel() { // Create the ServerSocketChannel object and register in a java.nio // fashion. if (serverChannel == null) { try { connectSelector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); InetSocketAddress address = new InetSocketAddress(myPort); serverChannel.socket().bind(address); serverChannel.register(connectSelector, SelectionKey.OP_ACCEPT); logDebug(getPlugInName() + " server started"); } catch (IOException e) { logError("Error when inititating " + getPlugInName() + ". Shutting down plug-in.", e); super.stopPlugIn(); return; } } try { // Note, the connectSelector.select() will block until a // connection becomesavailable. That's why // getDelayTime() returns zero. if (connectSelector.select() > 0) { Set readyKeys = connectSelector.selectedKeys(); for (Iterator i = readyKeys.iterator(); i.hasNext();) { SelectionKey key = (SelectionKey) i.next(); i.remove(); if (!key.isValid()) continue; if (key.isAcceptable()) { SocketChannel channel = null; try { channel = ((ServerSocketChannel) key.channel()) .accept(); channel.configureBlocking(false); channel.register(connectSelector, SelectionKey.OP_READ); } catch (IOException e) { logError("Error while accepting", e); try { channel.socket().shutdownOutput(); channel.close(); } catch (Exception ex) { } } } else if (key.isReadable()) { // The incoming connection is ready to be read. Start a // worker that will read the message. SampleSubThreadWorker worker = new SampleSubThreadWorker(); worker.setKey(key); submitWork(worker); } else { logError("Undefined key interest."); } } // for loop } } catch (IOException e) { logError("Error when receiving message.", e); } } public void cleanUpChannel() { try { serverChannel.close(); } catch (Exception e) { } try { connectSelector.close(); } catch (Exception e) { } } private class SampleSubThreadWorker implements IThreadWorker { SelectionKey key; String type; private Manifest manifest; public Manifest getManifest() { return manifest; } public void setManifest(Manifest manifest) { this.manifest = manifest; } public String getType() { return type; } public void setType(String type) { this.type = type; } public void setKey(SelectionKey key) { this.key = key; key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); } public void runWork() { ByteBuffer buffer = ByteBuffer.allocate(4); Manifest manifest = Manifest.getManifest(); setManifest(manifest); SocketChannel channel = null; try { channel = (SocketChannel) key.channel(); channel.socket().setSoTimeout(getReadtimeOut()); channel.read(buffer); buffer.flip(); int length = buffer.asIntBuffer().get(); persistMessage(channel, manifest, length, getReadtimeOut()); queueManifest(manifest); } catch (IOException e) { logError("Error while reading", e); } finally { try { channel.socket().shutdownOutput(); channel.close(); } catch (Exception ex) { } key.selector().wakeup(); key = null; } } @Override public void interruptWork(boolean arg0) { // TODO Auto-generated method stub } @Override public void terminateWork() { // TODO Auto-generated method stub } } }
-
-
Add these super class setter methods to set the properties. If not included, the default values are used.
setConnectionRetryCount(int);
setRetryInterval(long);
setMaximumDelayTime(long);
setIncrementDelayTime(long);