Monday, 15 September 2014

Textual description of firstImageUrl

MULE FTP : Create Directory If Not Exist

FTP and SFTP endpoints of mule , does not create directory on the fly , if the directory does not exist at ftp location .
It throws error like this :
ERROR 2007-11-05 15:24:29,192 [connector.ftp.0.receiver.1] org.mule.impl.DefaultExceptionStrategy: Caught exception in Exception Strategy: Failed to change working directory to /IN. Ftp error: 550
java.io.IOException: Failed to change working directory to /IN. Ftp error: 550
        at org.mule.providers.ftp.FtpMessageReceiver.listFiles(FtpMessageReceiver.java:103)
        at org.mule.providers.ftp.FtpMessageReceiver.poll(FtpMessageReceiver.java:72)
        at org.mule.providers.PollingReceiverWorker.run(PollingReceiverWorker.java:47)
        at org.mule.impl.work.WorkerContext.run(WorkerContext.java:310)
        at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:987)
        at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:528)
        at java.lang.Thread.run(Thread.java:619) 
There is an Open Issue for this functionality .

To implement this functionality for FTP , we have to create a custom FTPDispatcher and integrate it in the mule ftp connector .


So first change mule ftp connector to have CustomFTPMessageDispatcher like this:
<ftp:connector name="ftp_con">
  <service-overrides dispatcherFactory="CustomFtpMessageDispatcherFactory"></service-overrides>
 </ftp:connector>
    <flow name="sftpExampleFlow1" doc:name="sftpExampleFlow1">
     
        <file:inbound-endpoint path="E:/ftp" responseTimeout="10000" doc:name="File"/>
        <ftp:outbound-endpoint host="localhost" port="21" responseTimeout="10000" doc:name="FTP" connector-ref="ftp_con"
        outputPattern="custom/path/myFile.txt" path="/home/ftp" 
        />
    </flow>
And here's the DispatcherFactory class .

import org.mule.api.MuleException;
import org.mule.api.endpoint.OutboundEndpoint;
import org.mule.api.transport.MessageDispatcher;
import org.mule.transport.ftp.FtpMessageDispatcherFactory;

public class CustomFtpMessageDispatcherFactory extends FtpMessageDispatcherFactory
{
 /** {@inheritDoc} */
    public MessageDispatcher create(OutboundEndpoint endpoint) throws MuleException
    {
        return new CustomFtpMessageDispatcher(endpoint);
    }
}


And here's the FTPMessageDispacther , which will first create a directory if the outputPattern has it .

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.net.ftp.FTPClient;
import org.mule.api.MuleEvent;
import org.mule.api.endpoint.OutboundEndpoint;
import org.mule.api.transport.DispatchException;
import org.mule.config.i18n.CoreMessages;
import org.mule.model.streaming.CallbackOutputStream;
import org.mule.transport.ConnectException;
import org.mule.transport.ftp.FtpConnector;
import org.mule.transport.ftp.FtpMessageDispatcher;

public class CustomFtpMessageDispatcher extends FtpMessageDispatcher
{

 Log logger = LogFactory.getLog(CustomFtpMessageDispatcher.class);

 public CustomFtpMessageDispatcher(OutboundEndpoint endpoint)
 {
  super(endpoint);

 }

 protected void doDispatch(MuleEvent event) throws Exception
 {
  Object data = event.getMessage().getPayload();
  String outputPattern = (String) endpoint.getProperty(FtpConnector.PROPERTY_OUTPUT_PATTERN);
  String basePath = endpoint.getEndpointURI().getPath();
  OutputStream out = null;
  if (basePath.endsWith("/"))
   basePath = basePath.substring(0, basePath.length() - 1);

  if (outputPattern != null && outputPattern.contains("/"))
  {
   try
   {
    if (outputPattern.startsWith("/"))
     outputPattern = outputPattern.substring(1, outputPattern.length());
    
    String dirs[] = outputPattern.split("/", -1);
    final FtpConnector connector = (FtpConnector) endpoint.getConnector();
    final FTPClient client = connector.getFtp(endpoint.getEndpointURI());
    for (int i = 0; i < dirs.length - 1; i++)
    {
     try
     {
      if (!dirs[i].isEmpty())
      {
       basePath = basePath + "/" + dirs[i];
       if (!client.changeWorkingDirectory(basePath))
        client.makeDirectory(basePath);
      }

     } catch (Exception e)
     {
      logger.error("Error Creating dir on ftp" + e.getMessage());
     }
    }
    
    String filename = dirs[dirs.length - 1];
    out = client.storeFileStream(filename);
    if (out == null)
    {
     throw new IOException("FTP operation failed: " + client.getReplyString());
    }

    out = new CallbackOutputStream(out, new CallbackOutputStream.Callback()
    {
     public void onClose() throws Exception
     {
      try
      {
       if (!client.completePendingCommand())
       {
        client.logout();
        client.disconnect();
        throw new IOException("FTP Stream failed to complete pending request");
       }
      } finally
      {
       connector.releaseFtp(endpoint.getEndpointURI(), client);
      }
     }
    });
   }
   catch (ConnectException ce)
         {
             // Don't wrap a ConnectException, otherwise the retry policy will not go into effect.
             throw ce;
         }
         catch (Exception e)
         {
             throw new DispatchException(CoreMessages.streamingFailedNoStream(), event, (OutboundEndpoint)endpoint, e);
         }

  } 
  else
  {
   out = connector.getOutputStream(getEndpoint(), event);
  }

  try
  {
   if (data instanceof InputStream)
   {
    InputStream is = ((InputStream) data);
    IOUtils.copy(is, out);
    is.close();
   } else
   {
    byte[] dataBytes;
    if (data instanceof byte[])
    {
     dataBytes = (byte[]) data;
    } else
    {
     dataBytes = data.toString().getBytes(event.getEncoding());
    }
    IOUtils.write(dataBytes, out);
   }
  } finally
  {
   out.close();
  }
 }

}


Post Comments And Suggestions !!!