AbstractFileOutputWriter Generating duplicate tmp files

By : atchn
Source: Stackoverflow.com

I have an Apache Apex application consuming Kafka Logs and writing it to HDFS.

The DAG is simple enough that there is a Kafka Consumer (20 partitions of 2 GB memory for operator) connected by a stream to a "MyWriter extends AbstractFileOutputOperator".

Issue: 1. I have been seeing the Writer repeatedly writing .tmp files with same size and same data many times. I have tried increasing the Write Operator memory, increased the number of paritions of Writer etc. Still this issue keeps happening.

I tried adding/removing requestFinalize to MyWriter. Still same issue.

    public void endWindow()
        if (null != fileName) {

This is a subset of my properties.xml





    <value>1000000000</value> <!-- 1 GB File -->

This is the stack-trace I was able to get from dt.log for the operator: The operator gets redeployed probably in different contianers, throw this exception and keeps writing the duplicate files.

 java.lang.RuntimeException: java.io.FileNotFoundException: File does not exist: /kafkaconsumetest/inventoryCount/nested/trial2/1471489200000_1471489786800_161.0.1471489802786.tmp
        at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:418)
        at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:112)
        at com.datatorrent.stram.engine.Node.setup(Node.java:187)
        at com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1309)
        at com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:130)
        at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1388)
    Caused by: java.io.FileNotFoundException: File does not exist: /kafkaconsumetest/inventoryCount/nested/trial2/1471489200000_1471489786800_161.0.1471489802786.tmp
        at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1219)
        at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1211)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1211)
        at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:411)
        ... 5 more
2016-08-17 22:17:01,108 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy request: [161, 177]
2016-08-17 22:17:01,116 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy complete.
2016-08-17 22:17:02,121 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
2016-08-17 22:17:02,625 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
2016-08-17 22:17:03,129 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
By : atchn


The code for the base operator is at the following link and is referenced in the comments below: https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java

By setting the max file size to 1GB, you automatically enable rolling files; the relevant fields are:

protected Long maxLength = Long.MAX_VALUE;
protected transient boolean rollingFile = false;

The latter is set to true in the setup() method if the former has a value less than the default value of Long.MAX_VALUE.

When rolling files are enabled, file finalization is done automatically, so you should not call requestFinalize().

Secondly, in your MyWriter class, remove the endWindow() override and make sure you create a desired file name that includes the operator id in the setup() method and return this file name in the getFileName() override; this ensures that multiple partitioners don't step on one another. For example:

private String fileName;           // current base file name

private transient String fName;    // per partition file name

public void setup(Context.OperatorContext context)
  // create file name for this partition by appending the operator id to
  // the base name
  long id = context.getId();
  fName = fileName   "_p"   id;

  LOG.debug("Leaving setup, fName = {}, id = {}", fName, id);

protected String getFileName(Long[] tuple)
  return fName;

The file base name (fileName in the code above) can be set directly in the code or initialized from a property in an XML file (you'll need to add a getter and setter for it as well).

You can see an example of this type of usage at: https://github.com/DataTorrent/examples/tree/master/tutorials/fileOutput

Couple of additional suggestions:

  1. Set the partition count to 1 (or comment out the XML that sets the PARTITIONER attribute) and make sure everything works as expected. This will eliminate any issues that are not partitioning related. If possible, also reduce the max file size to, say, 2K or 4K so testing is easier.
  2. Once the single partition case works, increase the number of partitions to 2. If this works, arbitrary larger numbers (within reason) should also work.

This video can help you solving your question :)
By: admin