Skip to content

Spring Batch: Creating an FTP Tasklet to get remote files

December 21, 2012

First, if you are new to Spring Batch, check out the Spring Batch reference documentation and/or this introduction blog post.

In a recent project, I was involved in converting hundreds of mainframe jobs to Spring Batch jobs.  Some of these jobs included retrieving a file(s) from a vendor either by FTP (usually with PGP encryption) or SFTP and then processing the file(s).  I had used Spring Integration in the past to setup FTP polling with great success, but I wanted to be able use this functionality in the context of a Spring Batch step.  Having the FTP in a step made it easier from an operational perspective since the FTP became part of the job.  So, for example, in the case of a restart the FTP step could be skipped.

So I ended up creating a Tasklet that among other things could:

  1. Poll an FTP site for files based on a file name pattern and download the files.
  2. Configure a polling interval and a number of attempts to locate a file(s).

In the execution of the Tasklet, I utilized the FtpInboundFileSynchronizer and SftpInboundFileSynchronizer from Spring Integration to download the files from the remote site.  You could also set the retryIfNotFound attribute to true if you want to retry the download.  The retry behavior can be configured with the downloadFileAttempts and the retryIntervalMilliseconds attributes.

In a simple re-creation, here is the Tasklet minus getters and setters:

package org.reil.example;

import java.io.File;
import java.io.FileNotFoundException;
import java.util.List;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer;
import org.springframework.integration.ftp.filters.FtpSimplePatternFileListFilter;
import org.springframework.integration.ftp.inbound.FtpInboundFileSynchronizer;
import org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter;
import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
import org.springframework.util.Assert;

public class FtpGetRemoteFilesTasklet implements Tasklet, InitializingBean
{
    private Logger logger = LoggerFactory.getLogger(FtpGetRemoteFilesTasklet.class);
    private File localDirectory;
    private AbstractInboundFileSynchronizer<?> ftpInboundFileSynchronizer;
    private SessionFactory sessionFactory;
    private boolean autoCreateLocalDirectory = true;
    private boolean deleteLocalFiles = true;
    private String fileNamePattern;
    private String remoteDirectory;
    private int downloadFileAttempts = 12;
    private long retryIntervalMilliseconds = 300000;
    private boolean retryIfNotFound = false;


    /* (non-Javadoc)
     * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
     */
    public void afterPropertiesSet() throws Exception
    {
        Assert.notNull(sessionFactory, "sessionFactory attribute cannot be null");
        Assert.notNull(localDirectory, "localDirectory attribute cannot be null");
        Assert.notNull(remoteDirectory, "remoteDirectory attribute cannot be null");
        Assert.notNull(fileNamePattern, "fileNamePattern attribute cannot be null");
      
        setupFileSynchronizer();

        if (!this.localDirectory.exists())
        {
            if (this.autoCreateLocalDirectory)
            {
                if (logger.isDebugEnabled())
                {
                    logger.debug("The '" + this.localDirectory + "' directory doesn't exist; Will create.");
                }
                this.localDirectory.mkdirs();
            }
            else
            {
                throw new FileNotFoundException(this.localDirectory.getName());
            }
        }
    }

    private void setupFileSynchronizer()
    {
        if (isSftp())
        {
            ftpInboundFileSynchronizer = new SftpInboundFileSynchronizer(sessionFactory);
            ((SftpInboundFileSynchronizer) ftpInboundFileSynchronizer).setFilter(new SftpSimplePatternFileListFilter(fileNamePattern));
        }
        else
        {
            ftpInboundFileSynchronizer = new FtpInboundFileSynchronizer(sessionFactory);
            ((FtpInboundFileSynchronizer) ftpInboundFileSynchronizer).setFilter(new FtpSimplePatternFileListFilter(fileNamePattern));
        }
        ftpInboundFileSynchronizer.setRemoteDirectory(remoteDirectory);
    }
    
    private void deleteLocalFiles()
    {
        if (deleteLocalFiles)
        {
            SimplePatternFileListFilter filter = new SimplePatternFileListFilter(fileNamePattern);
            List<File> matchingFiles = filter.filterFiles(localDirectory.listFiles());
            if (CollectionUtils.isNotEmpty(matchingFiles))
            {
                for (File file : matchingFiles)
                {
                    FileUtils.deleteQuietly(file);
                }
            }
        }
    }

    /* (non-Javadoc)
     * @see org.springframework.batch.core.step.tasklet.Tasklet#execute(org.springframework.batch.core.StepContribution, org.springframework.batch.core.scope.context.ChunkContext)
     */
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception
    {
        deleteLocalFiles();

        ftpInboundFileSynchronizer.synchronizeToLocalDirectory(localDirectory);

        if (retryIfNotFound)
        {
            SimplePatternFileListFilter filter = new SimplePatternFileListFilter(fileNamePattern);
            int attemptCount = 1;
            while (filter.filterFiles(localDirectory.listFiles()).size() == 0 && attemptCount <= downloadFileAttempts)
            {
                logger.info("File(s) matching " + fileNamePattern + " not found on remote site.  Attempt " + attemptCount + " out of " + downloadFileAttempts);
                Thread.sleep(retryIntervalMilliseconds);
                ftpInboundFileSynchronizer.synchronizeToLocalDirectory(localDirectory);
                attemptCount++;
            }

            if (attemptCount >= downloadFileAttempts && filter.filterFiles(localDirectory.listFiles()).size() == 0)
            {
                throw new FileNotFoundException("Could not find remote file(s) matching " + fileNamePattern + " after " + downloadFileAttempts + " attempts.");
            }
        }

        return null;
    }
}

And the important FTP configuration pieces:

    @Bean
    public SessionFactory myFtpSessionFactory()
    {
        DefaultFtpSessionFactory ftpSessionFactory = new DefaultFtpSessionFactory();
        ftpSessionFactory.setHost("ftp.gnu.org");
        ftpSessionFactory.setClientMode(0);
        ftpSessionFactory.setFileType(0);
        ftpSessionFactory.setPort(21);
        ftpSessionFactory.setUsername("anonymous");
        ftpSessionFactory.setPassword("anonymous");
        
        return ftpSessionFactory;
    }

    @Bean
    @Scope(value="step")
    public FtpGetRemoteFilesTasklet myFtpGetRemoteFilesTasklet()
    {
        FtpGetRemoteFilesTasklet  ftpTasklet = new FtpGetRemoteFilesTasklet();
        ftpTasklet.setRetryIfNotFound(true);
        ftpTasklet.setDownloadFileAttempts(3);
        ftpTasklet.setRetryIntervalMilliseconds(10000);
        ftpTasklet.setFileNamePattern("README");
        //ftpTasklet.setFileNamePattern("TestFile");
        ftpTasklet.setRemoteDirectory("/");
        ftpTasklet.setLocalDirectory(new File(System.getProperty("java.io.tmpdir")));
        ftpTasklet.setSessionFactory(myFtpSessionFactory);
        
        return ftpTasklet;
    }

For the full example you can download it from GitHub here. The project is spring-batch-ftp. Run the ExampleJobConfigurationTests JUnit to see the code in action. The FTP site I used to test with is ftp.gnu.org. Experiment with the fileNamePattern on the FtpGetRemoteFilesTasklet to use a name that would not be found on the FTP site to see the retry functionality. When all attempts have been exhausted, an exception will be thrown.

It is my intention to do some more Spring Batch related posts based on my experiences converting mainframe batch applications to Spring Batch. So hoping my first Spring Batch post will not be my last!

From → Spring

14 Comments
  1. RobBe permalink

    This was REALLY helpful for the project I am working on. Any ideas on polling multiple Directories on the sam FTP site?

    • If it is a handful of directories, you can just configure mutliple steps that re-use the same configuration and just change the remote directory. Also, you could make it so you can pass in a List of directories and iterate through them and call the ftpInboundFileSynchronizer.setRemoteDirectory before synchronizing.

      If you are talking about a large number of directories and subdirectories that you need to recurse, you could extend the FtpInboundFileSynchronizer class. See post #7 in the following spring forum post. Inbound-FTP-Polling-sub-directories

  2. Hey There. I discovered your weblog the use of msn. This is an extremely smartly written article. I’ll make sure to bookmark it and return to read extra of your useful info. Thank you for the post. I will certainly return.

  3. kiran venampelli permalink

    As I observe the API, its internally using the (FTPClient) client.retrieveFile(path, fos), to copy the file to FileOutputStream but its not using any ItemReader/ItemWriter so is this really using spring batch techniques?

    • Hi Kiran. I would say that it is using Spring Batch techniques because you are just creating a Tasklet. You can write tasklets that do whatever you want in a Spring Batch step (validation, send emails, execute legacy code, etc). ItemReader/ItemWriter is used with the chunk oriented processing style. I would use chunk oriented processing to process the files, after they have been downloaded in the FTP step. Thanks!

      • kiran vennampelli permalink

        is there any way, we can download FTP content using spring chunk’s?, My requirement is to download content from FTP in faster way. Currently we have a system where it will use apache FTP API to download file’s can we do the same using spring batch?

  4. Skylab permalink

    Hi,

    Could anyone tell the technique to Spring Batch: Creating an HTTP Tasklet to get remote files.

    Thanks in advance.

  5. aaronf permalink

    This is fantastic and super helpful! Thank you. Do you know if there is an easy way to detect if a file exists before attempting to download it? Or if you can get a count of the files that match the fileNamePattern on the remote system? It seems like all i can call from the Tasklet is synchronizeToLocalDirectory. I’m trying to incorporate some audit trails in my job as I go. Thanks again for sharing your project.

    • Since you have access to the sessionFactory, you can do something like the following before you synchronize. The generic type will be different for SFTP.


      Session session = null;
      try {
      session = this.sessionFactory.getSession();
      Assert.state(session != null, "failed to acquire a Session");
      FTPFile[] files = session.list(this.remoteDirectory);
      if (!ObjectUtils.isEmpty(files)) {
      FtpSimplePatternFileListFilter filter = new FtpSimplePatternFileListFilter(fileNamePattern);
      Collection filteredFiles = filter.filterFiles(files);
      logger.info("matching files count: " + filteredFiles.size());
      for (FTPFile file : filteredFiles) {
      if (file != null) {
      logger.info("matching file: " + file.getName());
      }
      }
      }
      }
      catch (IOException e) {
      throw new RuntimeException("Error listing files", e);
      }
      finally {
      if (session != null) {
      try {
      session.close();
      }
      catch (Exception ignored) {
      }
      }
      }

  6. transded permalink

    Thanks, this very helpfull. can i ask you something.. how to create FTP for put file?
    thanks before

Leave a reply to coreyreil Cancel reply