python 2.7 - How to use StreamXmlRecordReader to parse single & multiline xml records within a single file -


i have input file (txt) below

<a><b><c>val1</c></b></a>||<a><b><c>val2</c></b></a>||<a><b> <c>val3</c></b></a>||<a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a> 

if observe input carefully, xml data record after third '||' split across 2 lines.

i want use streamxmlrecordreader of hadoop streaming parse file

-inputreader "org.apache.hadoop.streaming.streamxmlrecordreader,begin=<a>,end=</a>,slowmatch=true 

which unable parse 3rd record.

i getting below error

traceback (most recent call last):   file "/home/rsome/test/code/m1.py", line 13, in <module>     root = et.fromstring(xml_str.getvalue())   file "/usr/lib64/python2.6/xml/etree/elementtree.py", line 964, in xml     return parser.close()   file "/usr/lib64/python2.6/xml/etree/elementtree.py", line 1254, in close     self._parser.parse("", 1) # end of data xml.parsers.expat.expaterror: no element found: line 1, column 18478 

i have used slowmatch=true still no luck.

my output coming below

$ hdfs dfs -text /poc/testout001/part-* rec::1::mapper1 <a><b><c>val1</c></b></a> rec::2::mapper1 <a><b><c>val2</c></b></a> rec::3::mapper1 <a><b> rec::4::mapper1 <c>val3</c></b></a> rec::1::mapper2 <a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a> 

my expected output is

$ hdfs dfs -text /poc/testout001/part-* rec::1::mapper1 <a><b><c>val1</c></b></a> rec::2::mapper1 <a><b><c>val2</c></b></a> rec::3::mapper1 <a><b><c>val3</c></b></a> rec::1::mapper2 <a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a> 

any on of great help

basically, streamxmlinputformat hadoop-streaming's default input format extends keyvaluetextinputformat split lines @ new line character (\r\n) not expected in case record split across multiple lines.

hence, overcome this, have implemented own input format extending fileinputformat had liberty further new line chars (\r\n) endtag.

usage:

-libjars /path/to/custom-xml-input-format-1.0.0.jar -d xmlinput.start="<a>" \ -d xmlinput.end="</a>" \     -inputformat "my.package.customxmlinputformat" 

here's code used.

import java.io.*; import java.lang.reflect.*;  import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.fsdatainputstream; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.dataoutputbuffer; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapred.*; import org.apache.hadoop.streaming.*;   public class customxmlinputformat extends fileinputformat {    public static final string start_tag_key = "xmlinput.start";   public static final string end_tag_key = "xmlinput.end";    @suppresswarnings("unchecked")   @override   public recordreader<longwritable, text> getrecordreader(final inputsplit genericsplit,                                       jobconf job, reporter reporter) throws ioexception {       return new xmlrecordreader((filesplit) genericsplit, job, reporter);   }     public static class xmlrecordreader implements recordreader<longwritable, text> {      private final byte[] endtag;     private final byte[] starttag;     private final long start;     private final long end;     private final fsdatainputstream fsin;     private final dataoutputbuffer buffer = new dataoutputbuffer();     private longwritable currentkey;     private text currentvalue;      public xmlrecordreader(filesplit split, jobconf conf, reporter reporter) throws ioexception {       starttag = conf.get(start_tag_key).getbytes("utf-8");       endtag = conf.get(end_tag_key).getbytes("utf-8");        start = split.getstart();       end = start + split.getlength();       path file = split.getpath();       filesystem fs = file.getfilesystem(conf);       fsin = fs.open(split.getpath());       fsin.seek(start);     }       public boolean next(longwritable key, text value) throws ioexception {       if (fsin.getpos() < end && readuntilmatch(starttag, false)) {         try {           buffer.write(starttag);           if (readuntilmatch(endtag, true)) {             key.set(fsin.getpos());             value.set(buffer.getdata(), 0, buffer.getlength());             return true;           }         } {           buffer.reset();         }       }       return false;     }      public boolean readuntilmatch(byte[] match, boolean withinblock)         throws ioexception {       int = 0;       while (true) {         int b = fsin.read();         if (b == -1) {           return false;         }          if (withinblock && b != (byte) '\r' && b != (byte) '\n') {           buffer.write(b);         }          if (b == match[i]) {           i++;           if (i >= match.length) {             return true;           }         } else {           = 0;         }          if (!withinblock && == 0 && fsin.getpos() >= end) {           return false;         }       }     }      @override     public float getprogress() throws ioexception {       return (fsin.getpos() - start) / (float) (end - start);     }      @override     public synchronized long getpos() throws ioexception {         return fsin.getpos();     }      @override     public longwritable createkey() {       return new longwritable();     }      @override     public text createvalue() {       return new text();     }      @override     public synchronized void close() throws ioexception {         fsin.close();     }    } } 

here's output

$ hdfs dfs -text /poc/testout001/part-* 25      <a><b><c>val1</c></b></a> 52      <a><b><c>val2</c></b></a> 80      <a><b><c>val3</c></b></a> 141     <a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a> 

Comments

Popular posts from this blog

wordpress - (T_ENDFOREACH) php error -

Export Excel workseet into txt file using vba - (text and numbers with formulas) -

Using django-mptt to get only the categories that have items -