Wednesday, 25 June 2014

Textual Representation of logo

Mule File Inbound And Threads

Requirement : File inbound endpoint should pick only limited number of files , once these files are processed then only file inbound should pick other files in the directory .


For this situation , we can use processing strategy as synchronous in our flow . This strategy will pick only one file at a time which is not good .

If we set receiver threading profile on file inbound , it also does not work because file poller keeps acquiring the files after the time period, while some files are still being processed . The other solution is to use fork and join pattern of mule which is explained below .

First we create a custom file component which will give us the list of required number of files .Number of files will be configurable.
package com.javaroots;
import java.io.File;
import java.util.ArrayList;
import java.util.List;

import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;


public class FilePollerComponent implements Callable
{

 private String pollDir ;
 
 private int numberOfFiles;
 
 public String getPollDir()
 {
  return pollDir;
 }

 public void setPollDir(String pollDir)
 {
  this.pollDir = pollDir;
 }
 
 

 public int getNumberOfFiles()
 {
  return numberOfFiles;
 }

 public void setNumberOfFiles(int numberOfFiles)
 {
  if(numberOfFiles < 1 )
   throw new RuntimeException("Number of files can not be less than 1");
  this.numberOfFiles = numberOfFiles;
 }

 @Override
 public Object onCall(MuleEventContext eventContext) throws Exception
 {
  File f = new File(pollDir);
  List filesToReturn = new ArrayList(numberOfFiles);
  if(f.isDirectory())
  {
   File[] files = f.listFiles();
   int i = 0;
   for(File file : files)
   {
    if(i==numberOfFiles)
     break ;
    if(file.isFile())
    {
     filesToReturn.add(file);
     i++;
    }
   }
  }
  else
  {
   throw new Exception("Invalid Directory");
  }
  return filesToReturn;
 }

}
And the flow will be like this :
<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
 xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" version="CE-3.3.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd 
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd 
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd ">

<spring:beans>
<spring:bean id="filePoller" class="com.javaroots.FilePollerComponent">
<spring:property name="pollDir" value="E:/fileTest"></spring:property>
<spring:property name="numberOfFiles" value="3"></spring:property>


</spring:bean>

</spring:beans>

<flow name="fileInboundTestFlow" doc:name="fileInboundTestFlow" processingStrategy="synchronous">
     <poll frequency="200000">
      <component doc:name="File Poller">
       <spring-object bean="filePoller"/>
      </component>
     </poll>
     <logger message="Size of payload is : #[message.payload.size()]" level="INFO"/>
       <choice doc:name="Choice"> 
                <when expression="#[message.payload.size() &gt; 0]"> 
       <request-reply >
      <vm:outbound-endpoint path="out" >
        <collection-splitter />
      </vm:outbound-endpoint>
      <vm:inbound-endpoint path="response">
       <message-properties-transformer>
        <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="3" />
       </message-properties-transformer>
      <collection-aggregator/>
      </vm:inbound-endpoint>      
    </request-reply>
   </when>
   <otherwise>
     <logger />
   </otherwise>
  </choice>
    </flow>
    
    <flow name="processor" >
     <vm:inbound-endpoint path="out"/>
      <component class="com.javaroots.SleepComponent" doc:name="sleep"/>
      <vm:outbound-endpoint path="response"/>
    </flow>

</mule>


In the flow , first we put our custom file poller component which will give us list of files not more than 3 . Then we create request reply which will process this list , and it will wait untill all three files are processed . The actual processing of each individual file will happen in another flow , where i have put a sleep component .Request reply passes each payload to vm outbound and at the same vm , the other flow will listen .

Once the flow completes its processing , it returns the response to request reply flow . Collection aggregator blocks untill response from all the three files comes back .So in this way we can control number of files to be processed .

make sure that MULE_CORRELATION_GROUP_SIZE is equal to numberOfFiles which we want to be processed concurrently.

This solution is provided by David Dossot on my stack overflow question

You can see the full source code here

Post Comments and Suggestions !!!