Skip to content

Spring Batch: Handling a non-standard file format

In most of the Spring Batch jobs I have written that consume files, they are typically in some type of standard delimited or fixed width format. But what happens if you have something where multiple lines make up a record and you have some kind of marker that indicates the end of a record. Here is some example data where the end of a record is marked with an eor tag:


Mike
Smith
555-1234
eor
Sam
Johnson
555-2311
eor
Dave
Williams
555-9999
eor

One way to handle this is to define a custom RecordSeparatorPolicy. Spring already provides SuffixRecordSeparatorPolicy that allows you to look for a specific String that terminates a record. Now that we have identified how to indicate the end of the record, what we want to do is treat the file like delimited file. To do this, we extend the SuffixRecordSeparatorPolicy and we can override the preProcess method to add a delimiter to the line. We create a CustomRecordSeparatorPolicy to do this.

 package org.reil.example;  
   
 import org.springframework.batch.item.file.separator.SuffixRecordSeparatorPolicy;  
   
 public class CustomRecordSeparatorPolicy extends SuffixRecordSeparatorPolicy {  
        
      private char delimiter = ',';  
        
      @Override  
      public String preProcess(String line) {  
           line = line.trim() + delimiter;  
           return super.preProcess(line);  
      }  
   
      public char getDelimiter() {  
           return delimiter;  
      }  
   
      public void setDelimiter(char delimiter) {  
           this.delimiter = delimiter;  
      }  
        
 }  

Now you can you set the policy on your FlatFileItemReader and process like a normal delimited file. Example:

       FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();  
       reader.setResource(new ClassPathResource("testFile.txt"));  
       DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<Person>();  
       lineMapper.setFieldSetMapper(new PersonFieldSetMapper());  
       lineMapper.setLineTokenizer(new DelimitedLineTokenizer(','));  
       reader.setLineMapper(lineMapper);  
       //set the policy  
       CustomRecordSeparatorPolicy policy = new CustomRecordSeparatorPolicy();  
       policy.setSuffix("eor");  
       reader.setRecordSeparatorPolicy(policy);  

For the full runnable example you can download it from GitHub here. The project is spring-batch-file. Run the JobConfigurationTests JUnit to run the job.

Check out the Spring Batch reference documentation for more on handling of flat files. Also, for handling multiple record types within a file, take a look at the PatternMatchingCompositeLineMapper.

As a side note, it is really easy to create Spring Batch jobs using the STS Spring Project wizard. Also, there is quick start here if you are interested in creating a batch job that runs using Spring Boot.

Spring Batch: Creating an FTP Tasklet to get remote files

First, if you are new to Spring Batch, check out the Spring Batch reference documentation and/or this introduction blog post.

In a recent project, I was involved in converting hundreds of mainframe jobs to Spring Batch jobs.  Some of these jobs included retrieving a file(s) from a vendor either by FTP (usually with PGP encryption) or SFTP and then processing the file(s).  I had used Spring Integration in the past to setup FTP polling with great success, but I wanted to be able use this functionality in the context of a Spring Batch step.  Having the FTP in a step made it easier from an operational perspective since the FTP became part of the job.  So, for example, in the case of a restart the FTP step could be skipped.

So I ended up creating a Tasklet that among other things could:

  1. Poll an FTP site for files based on a file name pattern and download the files.
  2. Configure a polling interval and a number of attempts to locate a file(s).

In the execution of the Tasklet, I utilized the FtpInboundFileSynchronizer and SftpInboundFileSynchronizer from Spring Integration to download the files from the remote site.  You could also set the retryIfNotFound attribute to true if you want to retry the download.  The retry behavior can be configured with the downloadFileAttempts and the retryIntervalMilliseconds attributes.

In a simple re-creation, here is the Tasklet minus getters and setters:

package org.reil.example;

import java.io.File;
import java.io.FileNotFoundException;
import java.util.List;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer;
import org.springframework.integration.ftp.filters.FtpSimplePatternFileListFilter;
import org.springframework.integration.ftp.inbound.FtpInboundFileSynchronizer;
import org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter;
import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
import org.springframework.util.Assert;

public class FtpGetRemoteFilesTasklet implements Tasklet, InitializingBean
{
    private Logger logger = LoggerFactory.getLogger(FtpGetRemoteFilesTasklet.class);
    private File localDirectory;
    private AbstractInboundFileSynchronizer<?> ftpInboundFileSynchronizer;
    private SessionFactory sessionFactory;
    private boolean autoCreateLocalDirectory = true;
    private boolean deleteLocalFiles = true;
    private String fileNamePattern;
    private String remoteDirectory;
    private int downloadFileAttempts = 12;
    private long retryIntervalMilliseconds = 300000;
    private boolean retryIfNotFound = false;


    /* (non-Javadoc)
     * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
     */
    public void afterPropertiesSet() throws Exception
    {
        Assert.notNull(sessionFactory, "sessionFactory attribute cannot be null");
        Assert.notNull(localDirectory, "localDirectory attribute cannot be null");
        Assert.notNull(remoteDirectory, "remoteDirectory attribute cannot be null");
        Assert.notNull(fileNamePattern, "fileNamePattern attribute cannot be null");
      
        setupFileSynchronizer();

        if (!this.localDirectory.exists())
        {
            if (this.autoCreateLocalDirectory)
            {
                if (logger.isDebugEnabled())
                {
                    logger.debug("The '" + this.localDirectory + "' directory doesn't exist; Will create.");
                }
                this.localDirectory.mkdirs();
            }
            else
            {
                throw new FileNotFoundException(this.localDirectory.getName());
            }
        }
    }

    private void setupFileSynchronizer()
    {
        if (isSftp())
        {
            ftpInboundFileSynchronizer = new SftpInboundFileSynchronizer(sessionFactory);
            ((SftpInboundFileSynchronizer) ftpInboundFileSynchronizer).setFilter(new SftpSimplePatternFileListFilter(fileNamePattern));
        }
        else
        {
            ftpInboundFileSynchronizer = new FtpInboundFileSynchronizer(sessionFactory);
            ((FtpInboundFileSynchronizer) ftpInboundFileSynchronizer).setFilter(new FtpSimplePatternFileListFilter(fileNamePattern));
        }
        ftpInboundFileSynchronizer.setRemoteDirectory(remoteDirectory);
    }
    
    private void deleteLocalFiles()
    {
        if (deleteLocalFiles)
        {
            SimplePatternFileListFilter filter = new SimplePatternFileListFilter(fileNamePattern);
            List<File> matchingFiles = filter.filterFiles(localDirectory.listFiles());
            if (CollectionUtils.isNotEmpty(matchingFiles))
            {
                for (File file : matchingFiles)
                {
                    FileUtils.deleteQuietly(file);
                }
            }
        }
    }

    /* (non-Javadoc)
     * @see org.springframework.batch.core.step.tasklet.Tasklet#execute(org.springframework.batch.core.StepContribution, org.springframework.batch.core.scope.context.ChunkContext)
     */
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception
    {
        deleteLocalFiles();

        ftpInboundFileSynchronizer.synchronizeToLocalDirectory(localDirectory);

        if (retryIfNotFound)
        {
            SimplePatternFileListFilter filter = new SimplePatternFileListFilter(fileNamePattern);
            int attemptCount = 1;
            while (filter.filterFiles(localDirectory.listFiles()).size() == 0 && attemptCount <= downloadFileAttempts)
            {
                logger.info("File(s) matching " + fileNamePattern + " not found on remote site.  Attempt " + attemptCount + " out of " + downloadFileAttempts);
                Thread.sleep(retryIntervalMilliseconds);
                ftpInboundFileSynchronizer.synchronizeToLocalDirectory(localDirectory);
                attemptCount++;
            }

            if (attemptCount >= downloadFileAttempts && filter.filterFiles(localDirectory.listFiles()).size() == 0)
            {
                throw new FileNotFoundException("Could not find remote file(s) matching " + fileNamePattern + " after " + downloadFileAttempts + " attempts.");
            }
        }

        return null;
    }
}

And the important FTP configuration pieces:

    @Bean
    public SessionFactory myFtpSessionFactory()
    {
        DefaultFtpSessionFactory ftpSessionFactory = new DefaultFtpSessionFactory();
        ftpSessionFactory.setHost("ftp.gnu.org");
        ftpSessionFactory.setClientMode(0);
        ftpSessionFactory.setFileType(0);
        ftpSessionFactory.setPort(21);
        ftpSessionFactory.setUsername("anonymous");
        ftpSessionFactory.setPassword("anonymous");
        
        return ftpSessionFactory;
    }

    @Bean
    @Scope(value="step")
    public FtpGetRemoteFilesTasklet myFtpGetRemoteFilesTasklet()
    {
        FtpGetRemoteFilesTasklet  ftpTasklet = new FtpGetRemoteFilesTasklet();
        ftpTasklet.setRetryIfNotFound(true);
        ftpTasklet.setDownloadFileAttempts(3);
        ftpTasklet.setRetryIntervalMilliseconds(10000);
        ftpTasklet.setFileNamePattern("README");
        //ftpTasklet.setFileNamePattern("TestFile");
        ftpTasklet.setRemoteDirectory("/");
        ftpTasklet.setLocalDirectory(new File(System.getProperty("java.io.tmpdir")));
        ftpTasklet.setSessionFactory(myFtpSessionFactory);
        
        return ftpTasklet;
    }

For the full example you can download it from GitHub here. The project is spring-batch-ftp. Run the ExampleJobConfigurationTests JUnit to see the code in action. The FTP site I used to test with is ftp.gnu.org. Experiment with the fileNamePattern on the FtpGetRemoteFilesTasklet to use a name that would not be found on the FTP site to see the retry functionality. When all attempts have been exhausted, an exception will be thrown.

It is my intention to do some more Spring Batch related posts based on my experiences converting mainframe batch applications to Spring Batch. So hoping my first Spring Batch post will not be my last!

Easy Auto Completion with ICEfaces and Glazed Lists

This is my first blog post ever, so please excuse the formatting. Here it goes… Using the ICEfaces selectInputText component, combined with Glazed Lists, I was able to put together a simple auto completion implementation.

First, I created a class called SelectItemModel that handles the data behind JSF SelectItem based components (e.g. selectOneMenu).  I am not going to talk much about this, other than it allowed me to abstract out some of the SelectItem component behavior for the data represented by a list of SelectItems.


public class SelectItemModel
{
    private Map modelObjectMap = new HashMap();
    
    private List selectItemList = new ArrayList();
    
    private String selectedItemName;
    
    private String[] selectedItemNames;
    
    public void clear()
    {
        modelObjectMap.clear();
        selectItemList.clear();
        selectedItemName = null;
        selectedItemNames = null;
    }
    
    public void addSelectItem(String name, String label, T object)
    {
        if (!modelObjectMap.containsKey(name))
        {
            selectItemList.add(new SelectItem(name, label));
        }
        
        modelObjectMap.put(name, object);
    }

    public void addSelectItem(String name, T object)
    {
        if (!modelObjectMap.containsKey(name))
        {
            selectItemList.add(new SelectItem(name));
        }
        
        modelObjectMap.put(name, object);
    }
    
    public void addSelectTitleItem(String name)
    {
        selectItemList.add(0, new SelectItem(name));
    }
    
    public void sortAscending()
    {
        Collections.sort(selectItemList, new Comparator()
        {

            public int compare(SelectItem o1, SelectItem o2)
            {
                String obj1 = o1.getLabel();
                String obj2 = o2.getLabel();
                
                return obj1.compareTo(obj2);
            }
            
        });
    }
    
    public void sortDescending()
    {
        Collections.sort(selectItemList, new Comparator()
        {

            public int compare(SelectItem o1, SelectItem o2)
            {
                String obj1 = o1.getLabel();
                String obj2 = o2.getLabel();
                
                return obj2.compareTo(obj1);
            }
            
        });
    }
    
    public int getListCount()
    {
        return selectItemList.size();
    }
    
    /**
     * @return the selectedItemName
     */
    public String getSelectedItemName()
    {
        return selectedItemName;
    }

    /**
     * @param selectedItemName the selectedItemName to set
     */
    public void setSelectedItemName(String selectedItemName)
    {
        this.selectedItemName = selectedItemName;
    }

    /**
     * @return the selectItemList
     */
    public List getSelectItemList()
    {
        return selectItemList;
    }
    
    public Collection getModelObjects()
    {
        return modelObjectMap.values();
    }

    public T getModelObjectForName(String name)
    {
        return modelObjectMap.get(name);
    }

    /**
     * @return the selectedItemNames
     */
    public String[] getSelectedItemNames()
    {
        return selectedItemNames;
    }

    /**
     * @param selectedItemNames the selectedItemNames to set
     */
    public void setSelectedItemNames(String[] selectedItemNames)
    {
        this.selectedItemNames = selectedItemNames;
    }
   
    public List getSelectItemListAsStrings()
    {
        List stringValues = new ArrayList(selectItemList.size());
        for (SelectItem item : selectItemList)
        {
            stringValues.add((String) item.getValue());
        }
        
        return stringValues;
    }
    
}



Next I created an AutoCompleteBean class that is used by the ICEfaces selectInputText component. This class handles managing of the TextChangeEvent and does the filtering of the data model using Glazed Lists. Glazed Lists is an excellent framework for doing collection filtering using text matching. I had used it in the past in a Swing application and found it plugged nicely into how the ICEfaces selectInputText component is submitting text value changes to the server side to update the backing bean. You can configure the text matching to be either a “starts with” or a “contains” matching.


public class AutoCompleteBean
{
    public enum AutoCompleteMode {
        startsWith,
        contains
    }
    
    private TextMatcherEditor matcherEditor;
    
    private SelectItemModel selectItemModel = new SelectItemModel();
    
    private List autoCompleteList;
    
    private String searchValue;
    
    public AutoCompleteBean(SelectItemModel aSelectItemModel, AutoCompleteMode mode)
    {        
        this.selectItemModel = aSelectItemModel;
        matcherEditor = new TextMatcherEditor(new TextFilterator() {

            public void getFilterStrings(List stringList, Object obj)
            {
                SelectItem item = (SelectItem) obj;
                stringList.add(item.getValue().toString());
            }
               
           });
        if (mode.equals(AutoCompleteMode.startsWith))
        {
            matcherEditor.setMode(TextMatcherEditor.STARTS_WITH);
        }
        else if (mode.equals(AutoCompleteMode.contains))
        {
            matcherEditor.setMode(TextMatcherEditor.CONTAINS);
        }
        
        
        EventList eventList = GlazedLists.eventList(selectItemModel.getSelectItemList());
        autoCompleteList = new FilterList(eventList, matcherEditor);
    }
    
    public T getResult()
    {
        if(searchValue == null)
            return null;
        
        T result = selectItemModel.getModelObjectForName(searchValue);
        if (result == null)
        {
            result = selectItemModel.getModelObjectForName(searchValue.toUpperCase());
        }
        
        return result;
    }
    
    public T getResult(String key)
    {
        if(key == null)
            return null;
        
        T result = selectItemModel.getModelObjectForName(key);
        if (result == null)
        {
            result = selectItemModel.getModelObjectForName(key.toUpperCase());
        }
        
        return result;
    }
    
    public void autoCompleteTextValueChanged(TextChangeEvent event)
    {
        String value = event.getNewValue().toString();
        searchValue = value;
        matcherEditor.setFilterText(new String[]{value});
    }
    
    /**
     * @return the searchValue
     */
    public String getSearchValue()
    {
        return searchValue;
    }

    /**
     * @param searchValue the searchValue to set
     */
    public void setSearchValue(String searchValue)
    {
        this.searchValue = searchValue;
    }

    /**
     * @return the autoCompleteList
     */
    public List getAutoCompleteList()
    {
        return autoCompleteList;
    }

    /**
     * @param autoCompleteList the autoCompleteList to set
     */
    public void setAutoCompleteList(List autoCompleteList)
    {
        this.autoCompleteList = autoCompleteList;
    }
    
}



In the following code, I configure the AutoCompleteBean in my JSF managed bean.


public class AutoCompleteController { private AutoCompleteBean autoCompleteBean; public AutoCompleteController() { SelectItemModel selectItemModel = new SelectItemModel(); selectItemModel.addSelectItem("Bob", "Bob"); selectItemModel.addSelectItem("Tim", "Tim"); selectItemModel.addSelectItem("Scott", "Scott"); selectItemModel.addSelectItem("John", "John"); selectItemModel.addSelectItem("Jim", "Jim"); selectItemModel.addSelectItem("Andrew", "Andrew"); selectItemModel.addSelectItem("Gavin", "Gavin"); selectItemModel.addSelectItem("Mary", "Mary"); selectItemModel.addSelectItem("Mike", "Mike"); selectItemModel.addSelectItem("Becky", "Becky"); selectItemModel.addSelectItem("Brad", "Brad"); autoCompleteBean = new AutoCompleteBean(selectItemModel, AutoCompleteMode.startsWith); } /** * @return the autoCompleteBean */ public AutoCompleteBean getAutoCompleteBean() { return autoCompleteBean; } /** * @param autoCompleteBean the autoCompleteBean to set */ public void setAutoCompleteBean(AutoCompleteBean autoCompleteBean) { this.autoCompleteBean = autoCompleteBean; } }



And here is the code for your JSF page where you will be displaying the selecInputText component.


<ice:selectInputText
	id="autoComp"
	partialSubmit="true"
	value="#{autoCompleteController.autoCompleteBean.searchValue}"
	textChangeListener="#{autoCompleteController.autoCompleteBean.autoCompleteTextValueChanged}"
	required="true"
	rows="10">
	<f:selectItems value="#{autoCompleteController.autoCompleteBean.autoCompleteList}" />
</ice:selectInputText>



That’s it! So far it seems to be working well.

Follow

Get every new post delivered to your Inbox.