Writing Custom Polling Channel with Retry Reconnect and Back-off

Writing Custom Polling Channel with Retry Reconnect and Back-off

  1. Write an incoming communication channel class.

    The class must extend another class called com.infor.ec.communication.PollingChannel.

  2. Implement these abstract methods in the super class:
    • initiateChannel

    • getDelayTime

    • cleanUpChannel

  3. Implement the runChannel method.

    Use these guidelines:

    • You must add the persistMessage and queueManifest methods

    • Add these properties in the java code:

      Retry Reconnect Properties ConnectionRetryCount, RetryInterval

      Back-off Properties MaximumDelayTime, IncrementDelayTime

      See Poll frequency and Retry Reconnect.

    • In your Java code, define how to pull data. See the example Java code for the SamplePollinChannel.java Custom Polling Channel.

    Use this sample Java code:

    
    
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    import org.apache.log4j.Category;
    import com.infor.ec.communication.PollingChannel;
    import com.intentia.ec.server.ThreadPool;
    import com.intentia.ec.shared.IThreadWorker;
    import com.intentia.ec.shared.Manifest;
    import com.intentia.ec.shared.dbitems.ChannelProperty;
    
    public class SamplePollinChannel extends PollingChannel {
    	// A log4J category, used for logging.
    	public static Category cat = Category
    			.getInstance(SamplePollinChannel.class.getName());
    	// The port number. Set it to be 222222 as default.
    	private int myPort = 222222;
    	
    	// We are dealing with incoming sockets, hence we need a
    	// ServerSocketChannel.
    	private ServerSocketChannel serverChannel;
    	// Since we are using java.nio we needs a Selector.
    	private Selector connectSelector;
    
    	public SamplePollinChannel() {
    		super(cat);
    	}
    
    	public boolean initiateChannel(ChannelProperty[] props) {
    		// Start by extracting the property for the port value.
    		String propKey = "Port";
    		// Retrieve the correct ChannelProperty from the array
    		ChannelProperty prop = getProperty(props, propKey);
    		String value = getValue(prop);
    
    		// If the property did not exist we return false.
    		if (prop == null)
    			return false;
    		if (value == null) {
    			// If value is null but mandatory we issue a warning and use the
    			// implemented default value.
    			if (prop.isMandatory()) {
    				cat.warn("Using internal default value: " + myPort);
    			}
    		} else {
    			try {
    				int tmp = Integer.parseInt(value);
    				if (tmp <= 0) {
    					throw new IllegalArgumentException(
    							"Negative or zero value.");
    				}
    				myPort = tmp;
    			} catch (Exception e) {
    				cat.warn(propKey
    						+ " property for SamplePollinChannel is	invalid. Using internal default value: "
    						+ myPort);
    			}
    		}
    		// Do the same for the MaxSubThreads-property.
    		propKey = "MaxSubThreads";
    		prop = getProperty(props, propKey);
    		value = getValue(prop);
    		if (prop == null)
    			return false;
    		if (value == null) {
    			if (prop.isMandatory()) {
    				cat.warn("Using internal default value: " + maxSubThreads);
    			}
    		} else {
    			try {
    				int tmp = Integer.parseInt(value);
    				if (tmp <= 0) {
    					throw new IllegalArgumentException(
    							"Negative or zero value.");
    				}
    				maxSubThreads = tmp;
    			} catch (Exception e) {
    				cat.warn(propKey
    						+ " property for SamplePollinChannel is	invalid. Using internal default value: "
    						+ maxSubThreads);
    			}
    		}
    		// Finally the ReadTimeOut-property.
    		propKey = "ReadTimeOut";
    		prop = getProperty(props, propKey);
    		value = getValue(prop);
    		if (prop == null)
    			return false;
    		if (value == null) {
    			if (prop.isMandatory()) {
    				cat.warn("Using internal default value: " + getReadtimeOut());
    			}
    		} else {
    			try {
    				int tmp = Integer.parseInt(value);
    				if (tmp <= 0) {
    					throw new IllegalArgumentException(
    							"Negative or zero value.");
    				}
    				setReadtimeOut(tmp);
    			} catch (Exception e) {
    				cat.warn(propKey
    						+ " property for SamplePollinChannel is invalid.	Using internal default value: "
    						+ getReadtimeOut());
    			}
    		}
    		
    		// Connection Retry when error occurs
    		// Properties involved: ConnectionRetryCount, RetryInterval
    		propKey = "ConnectionRetryCount";
    		prop = getProperty(props, propKey);
    		value = getValue(prop);
    		if (prop == null) return false;
    		if (value == null)  {
    			if (prop.isMandatory()) return false;
    		}
    		else {
    			try {
    				int tmp = Integer.parseInt(value);
    				if (tmp < 0) {
    					logWarn("Using internal default value: " + getConnectionRetryCount());
    					throw new IllegalArgumentException("Negative or zero value.");
    				}
    				this.setConnectionRetryCount(tmp);
    			}
    			catch (Exception e) {
    				logWarn(propKey + " property for SamplePollinChannel is invalid. Using internal default value: " + getConnectionRetryCount());
    			}
    		}
    		
    		propKey = "RetryInterval";
    	    prop = getProperty(props, propKey);
    		value = getValue(prop);
    		if (prop == null) return false;
    		if (value == null)  {
    			if (prop.isMandatory()) {
    				logWarn("Using internal default value: " + getRetryInterval());
    			}
    		}
    		else {
    			try {
    				long num = Long.parseLong(value);
    				if (num <= 0) {
    					throw new IllegalArgumentException("Negative or zero value.");
    				}
    				setRetryInterval(num);
    			}
    			catch (Exception e) {
    				logWarn(propKey + " property for SamplePollinChannel is invalid. Using internal default value: " + getRetryInterval());
    			}
    		}
    		
    		// Back-off feature
    		// Properties involved: MaximumDelayTime, IncrementDelayTime
    		propKey = "MaximumDelayTime";
    	    prop = getProperty(props, propKey);
    		value = getValue(prop);
    		if (prop == null) return false;
    		if (value == null)  {
    			if (prop.isMandatory()) {
    				logWarn("Using internal default value: " + getMaximumDelayTime());
    			}
    		}
    		else {
    			try {
    				long num = Long.parseLong(value);
    				if (num <= 0) {
    					throw new IllegalArgumentException("Negative or zero value.");
    				}
    				setMaximumDelayTime(num);
    			}
    			catch (Exception e) {
    				logWarn(propKey + " property for SamplePollinChannel is invalid. Using internal default value: " + getMaximumDelayTime());
    			}
    		}
    		
    		propKey = "IncrementDelayTime";
    	    prop = getProperty(props, propKey);
    		value = getValue(prop);
    		if (prop == null) return false;
    		if (value == null)  {
    			if (prop.isMandatory()) {
    				logWarn("Using internal default value: " + getIncrementDelayTime());
    			}
    		}
    		else {
    			try {
    				long num = Long.parseLong(value);
    				if (num <= 0) {
    					throw new IllegalArgumentException("Negative or zero value.");
    				}
    				setIncrementDelayTime(num);
    			}
    			catch (Exception e) {
    				logWarn(propKey + " property for SamplePollinChannel is invalid. Using internal default value: " + getIncrementDelayTime());
    			}
    		}
    				
    		// Since we are going to use sub threads we register this channel to the
    		// MEC ThreadPool.
    		ThreadPool.getInstance().register(getClass().getName(), maxSubThreads);
    		// Everything went fine, therefore return true.
    		return true;
    	}
    
    	public long getDelayTime() {
    		return 0;
    	}
    
    	public void runChannel() {
    		// Create the ServerSocketChannel object and register in a java.nio
    		// fashion.
    		if (serverChannel == null) {
    			try {
    				connectSelector = Selector.open();
    				serverChannel = ServerSocketChannel.open();
    				serverChannel.configureBlocking(false);
    				InetSocketAddress address = new InetSocketAddress(myPort);
    				serverChannel.socket().bind(address);
    				serverChannel.register(connectSelector, SelectionKey.OP_ACCEPT);
    				logDebug(getPlugInName() + " server started");
    			} catch (IOException e) {
    				logError("Error when inititating " + getPlugInName()
    						+ ".	Shutting down plug-in.", e);
    				super.stopPlugIn();
    				return;
    			}
    		}
    		try {
    			// Note, the connectSelector.select() will block until a
    			// connection becomesavailable. That's why
    			// getDelayTime() returns zero.
    			if (connectSelector.select() > 0) {
    				Set readyKeys = connectSelector.selectedKeys();
    				for (Iterator i = readyKeys.iterator(); i.hasNext();) {
    					SelectionKey key = (SelectionKey) i.next();
    					i.remove();
    					if (!key.isValid())
    						continue;
    					if (key.isAcceptable()) {
    						SocketChannel channel = null;
    						try {
    							channel = ((ServerSocketChannel) key.channel())
    									.accept();
    							channel.configureBlocking(false);
    							channel.register(connectSelector,
    									SelectionKey.OP_READ);
    						} catch (IOException e) {
    							logError("Error while accepting", e);
    							try {
    								channel.socket().shutdownOutput();
    								channel.close();
    							} catch (Exception ex) {
    							}
    						}
    					} else if (key.isReadable()) {
    						// The incoming connection is ready to be read. Start a
    						// worker that will read the message.
    						SampleSubThreadWorker worker = new SampleSubThreadWorker();
    						worker.setKey(key);
    						submitWork(worker);
    					} else {
    						logError("Undefined key interest.");
    					}
    				} // for loop
    			}
    		} catch (IOException e) {
    			logError("Error when receiving message.", e);
    		}
    	}
    
    	public void cleanUpChannel() {
    		try {
    			serverChannel.close();
    		} catch (Exception e) {
    		}
    
    		try {
    			connectSelector.close();
    		} catch (Exception e) {
    		}
    	}
    
    	private class SampleSubThreadWorker implements IThreadWorker {
    		SelectionKey key;
    		String type;
    
    		private Manifest manifest;
    
    		public Manifest getManifest() {
    			return manifest;
    		}
    
    		public void setManifest(Manifest manifest) {
    			this.manifest = manifest;
    		}
    
    		public String getType() {
    			return type;
    		}
    
    		public void setType(String type) {
    			this.type = type;
    		}
    
    		public void setKey(SelectionKey key) {
    			this.key = key;
    			key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
    		}
    
    		public void runWork() {
    			ByteBuffer buffer = ByteBuffer.allocate(4);
    			Manifest manifest = Manifest.getManifest();
    			setManifest(manifest);
    			SocketChannel channel = null;
    			try {
    				channel = (SocketChannel) key.channel();
    				channel.socket().setSoTimeout(getReadtimeOut());
    				channel.read(buffer);
    				buffer.flip();
    				int length = buffer.asIntBuffer().get();
    				persistMessage(channel, manifest, length, getReadtimeOut());
    				queueManifest(manifest);
    			} catch (IOException e) {
    				logError("Error while reading", e);
    			} finally {
    				try {
    					channel.socket().shutdownOutput();
    					channel.close();
    				} catch (Exception ex) {
    				}
    				key.selector().wakeup();
    				key = null;
    			}
    		}
    
    		@Override
    		public void interruptWork(boolean arg0) {
    			// TODO Auto-generated method stub
    
    		}
    
    		@Override
    		public void terminateWork() {
    			// TODO Auto-generated method stub
    
    		}
    	}
    }
    
  4. Add these super class setter methods to set the properties. If not included, the default values are used.

    setConnectionRetryCount(int);

    setRetryInterval(long);

    setMaximumDelayTime(long);

    setIncrementDelayTime(long);