Version 6

    Message Inflow Example

     

    In this example we will use JCA 1.5 message inflow to monitor a directory for xml files and then forward them to an mdb. If the transaction commits, we delete the xml file to show the use of an XAResource.

     

    WARNING: This is not production quality code.

     

    Overview

     

    We need two deployments for this example.

    • The inbound resource adapter that has the job of monitoring the directory and forwarding any "messages" to the application

    • The mdb deployment configured to receive those "messages"

     

    The Resource Adapter

     

    The resource adapter is made up of the following pieces:

    • The resource adapter bean for lifecycle and endpoint activation

    • The activation spec to define parameters configurable on the mdb deployment

    • The message listener class the mdb will implement.

     

    The resource adapter bean

     

    Its major responsibility in this application is keeping track of endpoints,

    responding to events from the application about its own and mdb deployments.

    /*
     * JBoss, the OpenSource J2EE webOS
     * 
     * Distributable under LGPL license.
     * See terms of license at gnu.org.
     */
    package org.jboss.example;
    
    import java.util.Iterator;
    import java.util.Map;
    
    import javax.resource.ResourceException;
    import javax.resource.spi.ActivationSpec;
    import javax.resource.spi.BootstrapContext;
    import javax.resource.spi.ResourceAdapter;
    import javax.resource.spi.ResourceAdapterInternalException;
    import javax.resource.spi.endpoint.MessageEndpoint;
    import javax.resource.spi.endpoint.MessageEndpointFactory;
    import javax.transaction.xa.XAResource;
    
    import org.jboss.logging.Logger;
    
    import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
    
    /**
     * A FileXMLResourceAdapter.
     * 
     * @author <a href="adrian@jboss.com">Adrian Brock</a>
     * @version $Revision: 1.3 $
     */
    public class FileXMLResourceAdapter implements ResourceAdapter
    {
       /** The logger */
       private static final Logger log = Logger.getLogger(FileXMLResourceAdapter.class);
    
       /** The bootstrap context */
       private BootstrapContext ctx;
    
       /** The activations by activation spec */
       private ConcurrentReaderHashMap activations = new ConcurrentReaderHashMap();
    
       public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec spec) throws ResourceException
       {
          FileXMLActivation activation = new FileXMLActivation(ctx.createTimer(), (FileXMLActivationSpec) spec);
          MessageEndpoint endpoint = endpointFactory.createEndpoint(activation);
          activation.setEndpoint(endpoint);
          activations.put(spec, activation);
          try
          {
             activation.start();
          }
          catch (ResourceException e)
          {
             endpoint.release();
             throw e;
          }
       }
    
       public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec spec)
       {
          FileXMLActivation activation = (FileXMLActivation) activations.remove(spec);
          if (activation != null)
             activation.stop();
       }
       
       public XAResource[] getXAResources(ActivationSpec[] specs) throws ResourceException
       {
          // TODO getXAResources
          return null;
       }
       
       public void start(BootstrapContext ctx) throws ResourceAdapterInternalException
       {
          this.ctx = ctx;
       }
    
       public void stop()
       {
          for (Iterator i = activations.values().iterator(); i.hasNext();)
          {
             try
             {
                FileXMLActivation activation = (FileXMLActivation) i.next();
                activation.stop();
             }
             catch (Exception ignored)
             {
                log.debug("Ignored", ignored);
             }
             i.remove();
          }
       }
    }
    

     

    The message listener interface

     

    The MDB will implement this class do define what processing should be

    done for each message.

    /*
     * JBoss, the OpenSource J2EE webOS
     * 
     * Distributable under LGPL license.
     * See terms of license at gnu.org.
     */
    package org.jboss.example;
    
    import org.w3c.dom.Document;
    
    /**
     * An XMLMessageListener.
     * 
     * @author <a href="adrian@jboss.com">Adrian Brock</a>
     * @version $Revision: 1.1 $
     */
    public interface XMLMessageListener
    {
       void processXML(Document document) throws Exception;
    }
    

     

    The activation specification

     

    This javabean is configured at deployment time for the mdb to specify parameters. In this case we have two parameters:

    • directory - where the xml files will be placed

    • period - the length of time between directory scans.

     

    /*
     * JBoss, the OpenSource J2EE webOS
     * 
     * Distributable under LGPL license.
     * See terms of license at gnu.org.
     */
    package org.jboss.example;
    
    import javax.resource.ResourceException;
    import javax.resource.spi.ActivationSpec;
    import javax.resource.spi.InvalidPropertyException;
    import javax.resource.spi.ResourceAdapter;
    
    /**
     * A FileXMLActivationSpec.
     * 
     * @author <a href="adrian@jboss.com">Adrian Brock</a>
     * @version $Revision: 1.1 $
     */
    public class FileXMLActivationSpec implements ActivationSpec
    {
       private ResourceAdapter ra;
       
       private String directory;
    
       private long period = 10000;
       
       public void validate() throws InvalidPropertyException
       {
          // TODO validate
       }
       
       public String getDirectory()
       {
          return directory;
       }
       
       public void setDirectory(String directory)
       {
          this.directory = directory;
       }
    
       public long getPeriodValue()
       {
          return period;
       }
    
       public String getPeriod()
       {
          return Long.toString(period);
       }
       
       public void setPeriod(String period)
       {
          this.period = Long.parseLong(period);
       }
    
       public ResourceAdapter getResourceAdapter()
       {
          return ra;
       }
       
       public void setResourceAdapter(ResourceAdapter ra) throws ResourceException
       {
          this.ra = ra;
       }
       
       public String toString()
       {
          return "FileXMLActivationSpec for directory " + directory;
       }
    }
    

     

    A helper class for implementation

     

    This class implements the endpoint activation. It is also a bogus (the example is really a bit contrived) use of an XAResource to take part in the transaction. In this case we remove the file at transaction commit.

    /*
     * JBoss, the OpenSource J2EE webOS
     * 
     * Distributable under LGPL license.
     * See terms of license at gnu.org.
     */
    package org.jboss.example;
    
    import java.lang.reflect.Method;
    import java.io.File;
    import java.util.Timer;
    import java.util.TimerTask;
    
    import javax.resource.ResourceException;
    import javax.resource.spi.endpoint.MessageEndpoint;
    import javax.transaction.xa.XAException;
    import javax.transaction.xa.XAResource;
    import javax.transaction.xa.Xid;
    import javax.xml.parsers.DocumentBuilder;
    import javax.xml.parsers.DocumentBuilderFactory;
    
    import org.jboss.logging.Logger;
    import org.w3c.dom.Document;
    import org.xml.sax.InputSource;
    
    /**
     * A FileXMLActivation.
     * 
     * @author <a href="adrian@jboss.com">Adrian Brock</a>
     * @version $Revision: 1.1 $
     */
    public class FileXMLActivation extends TimerTask implements XAResource
    {
       /** The log */
       private static final Logger log = Logger.getLogger(FileXMLActivation.class);
    
       /** The timer */
       private Timer timer;
       
       /** The activation spec */
       private FileXMLActivationSpec spec;
    
       /** The message endpoint */
       private MessageEndpoint endpoint;
    
       /** The directory */
       private File directory;
    
       /** The current file */
       private File currentFile;
    
       /** The document builder */
       DocumentBuilder builder;
    
       /** The process xml method */
       private static final Method PROCESSXML;
    
       static
       {
          try
          {
             PROCESSXML = XMLMessageListener.class.getMethod("processXML", new Class[] { Document.class });
          }
          catch (Exception e)
          {
             throw new RuntimeException(e);
          }
       }
    
       public FileXMLActivation(Timer timer, FileXMLActivationSpec spec)
       {
          this.timer = timer;
          this.spec = spec;
       }
    
       public void setEndpoint(MessageEndpoint endpoint)
       {
          this.endpoint = endpoint;
       }
    
       public void start() throws ResourceException
       {
          directory = new File(spec.getDirectory());
          if (!directory.exists())
             throw new ResourceException(directory + " does not exist");
          if (!directory.isDirectory())
             throw new ResourceException(directory + " is not a directory");
    
          DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
          try
          {
             builder = dbf.newDocumentBuilder();
          }
          catch (Exception e)
          {
             throw new ResourceException(e.toString());
          }
    
          timer.schedule(this, 0l, spec.getPeriodValue());
       }
    
       public void stop()
       {
          cancel();
          timer.cancel();
          endpoint.release();
       }
    
       public void run()
       {
          File[] files = directory.listFiles();
          for (int i = 0; i < files.length; ++i)
          {
             if (files[i].isFile())
             {
                currentFile = files[i];
                try
                {
                   Document doc = parseFile(files[i]);
                   if (doc != null)
                      processXML(doc);
                }
                finally
                {
                   currentFile = null;
                }
             }
          }
       }
    
       protected Document parseFile(File file)
       {
          try
          {
             InputSource is = new InputSource(file.toURL().toString());
             return builder.parse(is);
          }
          catch (Throwable t)
          {
             log.error("Error parsing file " + file, t);
             return null;
          }
       }
    
       protected void processXML(Document doc)
       {
          try
          {
             endpoint.beforeDelivery(PROCESSXML);
    
             // At this point we are in the transaction and we have the mdb's classloader
    
             try
             {
                ((XMLMessageListener) endpoint).processXML(doc);
             }
             finally
             {
                // This must be invoked if beforeDelivery was invoked
                endpoint.afterDelivery();
             }
          }
          catch (Throwable t)
          {
             log.error("Error in message listener", t);
          }
       }
    
       // XAResource implementation (a bad implementation)
    
       public void start(Xid xid, int flags)
       {
       }
    
       public void end(Xid xid, int flags)
       {
       }
    
       public int prepare(Xid xid)
       {
          return XAResource.XA_OK;
       }
    
       public void rollback(Xid xid)
       {
       }
    
       public void commit(Xid xid, boolean onePhase) throws XAException
       {
          currentFile.delete();
       }
    
       public void forget(Xid xid)
       {
       }
    
       public Xid[] recover(int flag)
       {
          return new Xid[0];
       }
    
       public int getTransactionTimeout()
       {
          return 0;
       }
    
       public boolean setTransactionTimeout(int seconds)
       {
          return false;
       }
    
       public boolean isSameRM(XAResource xares)
       {
          return (xares == this);
       }
    }
    

     

    The deployment descriptor for the rar

    <?xml version="1.0" encoding="UTF-8"?>
    <connector xmlns="http://java.sun.com/xml/ns/j2ee"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee
               http://java.sun.com/xml/ns/j2ee/connector_1_5.xsd"
               version="1.5">
    
       <description>File xml resource adapter</description>
       <display-name>Inflow of file xml Resource Adapter</display-name>
       <vendor-name>JBoss, Inc</vendor-name>
       <eis-type>JBoss Example</eis-type>
       <resourceadapter-version>4.0</resourceadapter-version>
    
       <license>
          <description>
          COPYRIGHT AND PERMISSION NOTICE
          Copyright (c) 2004 JBoss, Inc
          This is released under the terms of the LGPL.
          See gnu.org for details.
          </description>
          <license-required>true</license-required>
       </license>
    
       <resourceadapter>
          <resourceadapter-class>org.jboss.example.FileXMLResourceAdapter</resourceadapter-class>
    
          <inbound-resourceadapter>
             <messageadapter>        
                <messagelistener>
                   <messagelistener-type>org.jboss.example.XMLMessageListener</messagelistener-type>
                   <activationspec>
                      <activationspec-class>org.jboss.example.FileXMLActivationSpec</activationspec-class>
                      <required-config-property>
                          <config-property-name>directory</config-property-name>
                      </required-config-property>
                   </activationspec>
                </messagelistener>
             </messageadapter>
          </inbound-resourceadapter>
    
       </resourceadapter>
    </connector>
    

     

    The message driven bean

     

    This is a separate ejb deployment. You can have as many of these as you like for a given resource adapter. You just specify different activation spec

    properties.

     

    This is a simple mdb that just dumps the xml to the console.

    Notice it implements our message listener not the jms message listener.

    /*
     * JBoss, the OpenSource J2EE webOS
     * 
     * Distributable under LGPL license.
     * See terms of license at gnu.org.
     */
    package org.jboss.example;
    
    import java.io.ByteArrayOutputStream;
    
    import javax.ejb.EJBException;
    import javax.ejb.MessageDrivenBean;
    import javax.ejb.MessageDrivenContext;
    import javax.xml.transform.Transformer;
    import javax.xml.transform.TransformerFactory;
    import javax.xml.transform.dom.DOMSource;
    import javax.xml.transform.stream.StreamResult;
    
    import org.jboss.example.XMLMessageListener;
    import org.jboss.logging.Logger;
    import org.w3c.dom.Document;
    
    /**
     * Prints the xml.
     * 
     * @author <a href="adrian@jboss.com">Adrian Brock</a>
     * @version $Revision: 1.1 $
     */
    public class EchoXMLMessageListener implements MessageDrivenBean, XMLMessageListener
    {
       private static final Logger log = Logger.getLogger(EchoXMLMessageListener.class);
    
       private MessageDrivenContext ctx;
    
       private Transformer transformer;
       
       public void processXML(Document document) throws Exception
       {
          DOMSource source = new DOMSource(document);
          ByteArrayOutputStream baos = new ByteArrayOutputStream();
          StreamResult result = new StreamResult(baos);
          transformer.transform(source, result);
          log.info(baos.toString());
       }
    
       public void ejbCreate()
       {
          TransformerFactory tf = TransformerFactory.newInstance();
          try
          {
             transformer = tf.newTransformer();
          }
          catch (Exception e)
          {
             throw new EJBException(e);
          }
       }
       
       public void ejbRemove()
       {
       }
    
       public void setMessageDrivenContext(MessageDrivenContext ctx)
       {
          this.ctx = ctx;
       }
    }
    

     

    The mdb deployment descriptors

     

    ejb-jar.xml - besides the normal deployment information you specify <activation-config-property> values to configure the activation spec.

    <?xml version="1.0" encoding="UTF-8"?>
    <ejb-jar xmlns="http://java.sun.com/xml/ns/j2ee"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee
             http://java.sun.com/xml/ns/j2ee/ejb-jar_2_1.xsd"
             version="2.1">
    
       <enterprise-beans>
    
          <message-driven>
             <ejb-name>EchoXMLMDB</ejb-name>
             <ejb-class>org.jboss.example.EchoXMLMessageListener</ejb-class>
             <messaging-type>org.jboss.example.XMLMessageListener</messaging-type>
             <activation-config>
                <activation-config-property>
                   <activation-config-property-name>directory</activation-config-property-name>
                   <activation-config-property-value>c:\testfilexml</activation-config-property-value>
                </activation-config-property>
                <activation-config-property>
                   <activation-config-property-name>period</activation-config-property-name>
                   <activation-config-property-value>5000</activation-config-property-value>
                </activation-config-property>
            </activation-config>
            <transaction-type>Container</transaction-type>
          </message-driven>
    
       </enterprise-beans>
    
       <assembly-descriptor>
    
          <container-transaction>
             <method>
                <ejb-name>EchoXMLMDB</ejb-name>
                <method-name>*</method-name>
             </method>
             <trans-attribute>Required</trans-attribute>
          </container-transaction>
    
       </assembly-descriptor>
    
    </ejb-jar>
    

     

    jboss.xml - the only thing required in here is the identity of the resource adapter

    <?xml version="1.0" encoding="UTF-8"?>
    
    <jboss>
       <enterprise-beans>
          <message-driven>
             <ejb-name>EchoXMLMDB</ejb-name>
             <destination-jndi-name>dummy</destination-jndi-name>
             <resource-adapter-name>filexmlrar.rar</resource-adapter-name>
          </message-driven>
       </enterprise-beans>
    </jboss>
    

     

    Building the example

     

    Two ant projects are attached

    • filexmlrar - the resource adapter

    • filexmlmdb - the test mdb

    You need to change the location of your jboss deployment in build.properties

     

    You also need to change the location of the directory that is being

    scanned in jboss.xml (the example uses c:\testfilexml)

     

    Obviously the rar must be deployed before the mdb

     

    Output

     

    Deploy the rar

    14:02:41,403 INFO  [RARDeployment] Required license terms exist view the META-INF/ra.xml: file:/C:/jboss-head/workspace/
    build/output/jboss-4.0.0RC2/server/default/deploy/filexmlrar.rar
    

    Deploy the mdb

    14:03:02,414 INFO  [EjbModule] Deploying EchoXMLMDB
    14:03:03,655 INFO  [EJBDeployer] Deployed: file:/C:/jboss-head/workspace/build/output/jboss-4.0.0RC2/server/default/depl
    oy/filexmlmdb.jar
    

    Finally copy an xml file into the scanned directory (c:\testfilexml)

    14:03:13,730 INFO  [EchoXMLMessageListener] <?xml version="1.0" encoding="UTF-8"?><ejb-jar xmlns="http://java.sun.com/xm
    l/ns/j2ee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" version="2.1" xsi:schemaLocation="http://java.sun.com/x
    ml/ns/j2ee          http://java.sun.com/xml/ns/j2ee/ejb-jar_2_1.xsd">
    
       <enterprise-beans>
    
          <message-driven>
             <ejb-name>EchoXMLMDB</ejb-name>
             <ejb-class>org.jboss.example.EchoXMLMessageListener</ejb-class>
             <messaging-type>org.jboss.example.XMLMessageListener</messaging-type>
             <activation-config>
                <activation-config-property>
                   <activation-config-property-name>directory</activation-config-property-name>
                   <activation-config-property-value>c:\testfilexml</activation-config-property-value>
                </activation-config-property>
                <activation-config-property>
                   <activation-config-property-name>period</activation-config-property-name>
                   <activation-config-property-value>5000</activation-config-property-value>
                </activation-config-property>
            </activation-config>
            <transaction-type>Container</transaction-type>
          </message-driven>
    
       </enterprise-beans>
    
       <assembly-descriptor>
    
          <container-transaction>
             <method>
                <ejb-name>EchoXMLMDB</ejb-name>
                <method-name>*</method-name>
             </method>
             <trans-attribute>Required</trans-attribute>
          </container-transaction>
    
       </assembly-descriptor>
    
    </ejb-jar>