Spring Batch: Creating an FTP Tasklet to get remote files
First, if you are new to Spring Batch, check out the Spring Batch reference documentation and/or this introduction blog post.
In a recent project, I was involved in converting hundreds of mainframe jobs to Spring Batch jobs. Some of these jobs included retrieving a file(s) from a vendor either by FTP (usually with PGP encryption) or SFTP and then processing the file(s). I had used Spring Integration in the past to setup FTP polling with great success, but I wanted to be able use this functionality in the context of a Spring Batch step. Having the FTP in a step made it easier from an operational perspective since the FTP became part of the job. So, for example, in the case of a restart the FTP step could be skipped.
So I ended up creating a Tasklet that among other things could:
- Poll an FTP site for files based on a file name pattern and download the files.
- Configure a polling interval and a number of attempts to locate a file(s).
In the execution of the Tasklet, I utilized the FtpInboundFileSynchronizer and SftpInboundFileSynchronizer from Spring Integration to download the files from the remote site. You could also set the retryIfNotFound attribute to true if you want to retry the download. The retry behavior can be configured with the downloadFileAttempts and the retryIntervalMilliseconds attributes.
In a simple re-creation, here is the Tasklet minus getters and setters:
package org.reil.example;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer;
import org.springframework.integration.ftp.filters.FtpSimplePatternFileListFilter;
import org.springframework.integration.ftp.inbound.FtpInboundFileSynchronizer;
import org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter;
import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
import org.springframework.util.Assert;
public class FtpGetRemoteFilesTasklet implements Tasklet, InitializingBean
{
private Logger logger = LoggerFactory.getLogger(FtpGetRemoteFilesTasklet.class);
private File localDirectory;
private AbstractInboundFileSynchronizer<?> ftpInboundFileSynchronizer;
private SessionFactory sessionFactory;
private boolean autoCreateLocalDirectory = true;
private boolean deleteLocalFiles = true;
private String fileNamePattern;
private String remoteDirectory;
private int downloadFileAttempts = 12;
private long retryIntervalMilliseconds = 300000;
private boolean retryIfNotFound = false;
/* (non-Javadoc)
* @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
*/
public void afterPropertiesSet() throws Exception
{
Assert.notNull(sessionFactory, "sessionFactory attribute cannot be null");
Assert.notNull(localDirectory, "localDirectory attribute cannot be null");
Assert.notNull(remoteDirectory, "remoteDirectory attribute cannot be null");
Assert.notNull(fileNamePattern, "fileNamePattern attribute cannot be null");
setupFileSynchronizer();
if (!this.localDirectory.exists())
{
if (this.autoCreateLocalDirectory)
{
if (logger.isDebugEnabled())
{
logger.debug("The '" + this.localDirectory + "' directory doesn't exist; Will create.");
}
this.localDirectory.mkdirs();
}
else
{
throw new FileNotFoundException(this.localDirectory.getName());
}
}
}
private void setupFileSynchronizer()
{
if (isSftp())
{
ftpInboundFileSynchronizer = new SftpInboundFileSynchronizer(sessionFactory);
((SftpInboundFileSynchronizer) ftpInboundFileSynchronizer).setFilter(new SftpSimplePatternFileListFilter(fileNamePattern));
}
else
{
ftpInboundFileSynchronizer = new FtpInboundFileSynchronizer(sessionFactory);
((FtpInboundFileSynchronizer) ftpInboundFileSynchronizer).setFilter(new FtpSimplePatternFileListFilter(fileNamePattern));
}
ftpInboundFileSynchronizer.setRemoteDirectory(remoteDirectory);
}
private void deleteLocalFiles()
{
if (deleteLocalFiles)
{
SimplePatternFileListFilter filter = new SimplePatternFileListFilter(fileNamePattern);
List<File> matchingFiles = filter.filterFiles(localDirectory.listFiles());
if (CollectionUtils.isNotEmpty(matchingFiles))
{
for (File file : matchingFiles)
{
FileUtils.deleteQuietly(file);
}
}
}
}
/* (non-Javadoc)
* @see org.springframework.batch.core.step.tasklet.Tasklet#execute(org.springframework.batch.core.StepContribution, org.springframework.batch.core.scope.context.ChunkContext)
*/
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception
{
deleteLocalFiles();
ftpInboundFileSynchronizer.synchronizeToLocalDirectory(localDirectory);
if (retryIfNotFound)
{
SimplePatternFileListFilter filter = new SimplePatternFileListFilter(fileNamePattern);
int attemptCount = 1;
while (filter.filterFiles(localDirectory.listFiles()).size() == 0 && attemptCount <= downloadFileAttempts)
{
logger.info("File(s) matching " + fileNamePattern + " not found on remote site. Attempt " + attemptCount + " out of " + downloadFileAttempts);
Thread.sleep(retryIntervalMilliseconds);
ftpInboundFileSynchronizer.synchronizeToLocalDirectory(localDirectory);
attemptCount++;
}
if (attemptCount >= downloadFileAttempts && filter.filterFiles(localDirectory.listFiles()).size() == 0)
{
throw new FileNotFoundException("Could not find remote file(s) matching " + fileNamePattern + " after " + downloadFileAttempts + " attempts.");
}
}
return null;
}
}
And the important FTP configuration pieces:
@Bean
public SessionFactory myFtpSessionFactory()
{
DefaultFtpSessionFactory ftpSessionFactory = new DefaultFtpSessionFactory();
ftpSessionFactory.setHost("ftp.gnu.org");
ftpSessionFactory.setClientMode(0);
ftpSessionFactory.setFileType(0);
ftpSessionFactory.setPort(21);
ftpSessionFactory.setUsername("anonymous");
ftpSessionFactory.setPassword("anonymous");
return ftpSessionFactory;
}
@Bean
@Scope(value="step")
public FtpGetRemoteFilesTasklet myFtpGetRemoteFilesTasklet()
{
FtpGetRemoteFilesTasklet ftpTasklet = new FtpGetRemoteFilesTasklet();
ftpTasklet.setRetryIfNotFound(true);
ftpTasklet.setDownloadFileAttempts(3);
ftpTasklet.setRetryIntervalMilliseconds(10000);
ftpTasklet.setFileNamePattern("README");
//ftpTasklet.setFileNamePattern("TestFile");
ftpTasklet.setRemoteDirectory("/");
ftpTasklet.setLocalDirectory(new File(System.getProperty("java.io.tmpdir")));
ftpTasklet.setSessionFactory(myFtpSessionFactory);
return ftpTasklet;
}
For the full example you can download it from GitHub here. The project is spring-batch-ftp. Run the ExampleJobConfigurationTests JUnit to see the code in action. The FTP site I used to test with is ftp.gnu.org. Experiment with the fileNamePattern on the FtpGetRemoteFilesTasklet to use a name that would not be found on the FTP site to see the retry functionality. When all attempts have been exhausted, an exception will be thrown.
It is my intention to do some more Spring Batch related posts based on my experiences converting mainframe batch applications to Spring Batch. So hoping my first Spring Batch post will not be my last!
This was REALLY helpful for the project I am working on. Any ideas on polling multiple Directories on the sam FTP site?
If it is a handful of directories, you can just configure mutliple steps that re-use the same configuration and just change the remote directory. Also, you could make it so you can pass in a List of directories and iterate through them and call the ftpInboundFileSynchronizer.setRemoteDirectory before synchronizing.
If you are talking about a large number of directories and subdirectories that you need to recurse, you could extend the FtpInboundFileSynchronizer class. See post #7 in the following spring forum post. Inbound-FTP-Polling-sub-directories
Thanks