FTP and SFTP endpoints of mule , does not create directory on the fly , if the directory does not exist at ftp location . It throws error like this :
ERROR 2007-11-05 15:24:29,192 [connector.ftp.0.receiver.1] org.mule.impl.DefaultExceptionStrategy: Caught exception in Exception Strategy: Failed to change working directory to /IN. Ftp error: 550 java.io.IOException: Failed to change working directory to /IN. Ftp error: 550 at org.mule.providers.ftp.FtpMessageReceiver.listFiles(FtpMessageReceiver.java:103) at org.mule.providers.ftp.FtpMessageReceiver.poll(FtpMessageReceiver.java:72) at org.mule.providers.PollingReceiverWorker.run(PollingReceiverWorker.java:47) at org.mule.impl.work.WorkerContext.run(WorkerContext.java:310) at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:987) at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:528) at java.lang.Thread.run(Thread.java:619)There is an Open Issue for this functionality . To implement this functionality for FTP , we have to create a custom FTPDispatcher and integrate it in the mule ftp connector . So first change mule ftp connector to have CustomFTPMessageDispatcher like this:
<ftp:connector name="ftp_con"> <service-overrides dispatcherFactory="CustomFtpMessageDispatcherFactory"></service-overrides> </ftp:connector> <flow name="sftpExampleFlow1" doc:name="sftpExampleFlow1"> <file:inbound-endpoint path="E:/ftp" responseTimeout="10000" doc:name="File"/> <ftp:outbound-endpoint host="localhost" port="21" responseTimeout="10000" doc:name="FTP" connector-ref="ftp_con" outputPattern="custom/path/myFile.txt" path="/home/ftp" /> </flow>And here's the DispatcherFactory class .
import org.mule.api.MuleException; import org.mule.api.endpoint.OutboundEndpoint; import org.mule.api.transport.MessageDispatcher; import org.mule.transport.ftp.FtpMessageDispatcherFactory; public class CustomFtpMessageDispatcherFactory extends FtpMessageDispatcherFactory { /** {@inheritDoc} */ public MessageDispatcher create(OutboundEndpoint endpoint) throws MuleException { return new CustomFtpMessageDispatcher(endpoint); } }And here's the FTPMessageDispacther , which will first create a directory if the outputPattern has it .
import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.net.ftp.FTPClient; import org.mule.api.MuleEvent; import org.mule.api.endpoint.OutboundEndpoint; import org.mule.api.transport.DispatchException; import org.mule.config.i18n.CoreMessages; import org.mule.model.streaming.CallbackOutputStream; import org.mule.transport.ConnectException; import org.mule.transport.ftp.FtpConnector; import org.mule.transport.ftp.FtpMessageDispatcher; public class CustomFtpMessageDispatcher extends FtpMessageDispatcher { Log logger = LogFactory.getLog(CustomFtpMessageDispatcher.class); public CustomFtpMessageDispatcher(OutboundEndpoint endpoint) { super(endpoint); } protected void doDispatch(MuleEvent event) throws Exception { Object data = event.getMessage().getPayload(); String outputPattern = (String) endpoint.getProperty(FtpConnector.PROPERTY_OUTPUT_PATTERN); String basePath = endpoint.getEndpointURI().getPath(); OutputStream out = null; if (basePath.endsWith("/")) basePath = basePath.substring(0, basePath.length() - 1); if (outputPattern != null && outputPattern.contains("/")) { try { if (outputPattern.startsWith("/")) outputPattern = outputPattern.substring(1, outputPattern.length()); String dirs[] = outputPattern.split("/", -1); final FtpConnector connector = (FtpConnector) endpoint.getConnector(); final FTPClient client = connector.getFtp(endpoint.getEndpointURI()); for (int i = 0; i < dirs.length - 1; i++) { try { if (!dirs[i].isEmpty()) { basePath = basePath + "/" + dirs[i]; if (!client.changeWorkingDirectory(basePath)) client.makeDirectory(basePath); } } catch (Exception e) { logger.error("Error Creating dir on ftp" + e.getMessage()); } } String filename = dirs[dirs.length - 1]; out = client.storeFileStream(filename); if (out == null) { throw new IOException("FTP operation failed: " + client.getReplyString()); } out = new CallbackOutputStream(out, new CallbackOutputStream.Callback() { public void onClose() throws Exception { try { if (!client.completePendingCommand()) { client.logout(); client.disconnect(); throw new IOException("FTP Stream failed to complete pending request"); } } finally { connector.releaseFtp(endpoint.getEndpointURI(), client); } } }); } catch (ConnectException ce) { // Don't wrap a ConnectException, otherwise the retry policy will not go into effect. throw ce; } catch (Exception e) { throw new DispatchException(CoreMessages.streamingFailedNoStream(), event, (OutboundEndpoint)endpoint, e); } } else { out = connector.getOutputStream(getEndpoint(), event); } try { if (data instanceof InputStream) { InputStream is = ((InputStream) data); IOUtils.copy(is, out); is.close(); } else { byte[] dataBytes; if (data instanceof byte[]) { dataBytes = (byte[]) data; } else { dataBytes = data.toString().getBytes(event.getEncoding()); } IOUtils.write(dataBytes, out); } } finally { out.close(); } } }Post Comments And Suggestions !!!