python 2.7 - How to use StreamXmlRecordReader to parse single & multiline xml records within a single file -
i have input file (txt) below
<a><b><c>val1</c></b></a>||<a><b><c>val2</c></b></a>||<a><b> <c>val3</c></b></a>||<a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>
if observe input carefully, xml data record after third '||' split across 2 lines.
i want use streamxmlrecordreader of hadoop streaming parse file
-inputreader "org.apache.hadoop.streaming.streamxmlrecordreader,begin=<a>,end=</a>,slowmatch=true
which unable parse 3rd record.
i getting below error
traceback (most recent call last): file "/home/rsome/test/code/m1.py", line 13, in <module> root = et.fromstring(xml_str.getvalue()) file "/usr/lib64/python2.6/xml/etree/elementtree.py", line 964, in xml return parser.close() file "/usr/lib64/python2.6/xml/etree/elementtree.py", line 1254, in close self._parser.parse("", 1) # end of data xml.parsers.expat.expaterror: no element found: line 1, column 18478
i have used slowmatch=true still no luck.
my output coming below
$ hdfs dfs -text /poc/testout001/part-* rec::1::mapper1 <a><b><c>val1</c></b></a> rec::2::mapper1 <a><b><c>val2</c></b></a> rec::3::mapper1 <a><b> rec::4::mapper1 <c>val3</c></b></a> rec::1::mapper2 <a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>
my expected output is
$ hdfs dfs -text /poc/testout001/part-* rec::1::mapper1 <a><b><c>val1</c></b></a> rec::2::mapper1 <a><b><c>val2</c></b></a> rec::3::mapper1 <a><b><c>val3</c></b></a> rec::1::mapper2 <a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>
any on of great help
basically, streamxmlinputformat hadoop-streaming's default input format extends keyvaluetextinputformat split lines @ new line character (\r\n) not expected in case record split across multiple lines.
hence, overcome this, have implemented own input format extending fileinputformat had liberty further new line chars (\r\n) endtag.
usage:
-libjars /path/to/custom-xml-input-format-1.0.0.jar -d xmlinput.start="<a>" \ -d xmlinput.end="</a>" \ -inputformat "my.package.customxmlinputformat"
here's code used.
import java.io.*; import java.lang.reflect.*; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.fsdatainputstream; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.dataoutputbuffer; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapred.*; import org.apache.hadoop.streaming.*; public class customxmlinputformat extends fileinputformat { public static final string start_tag_key = "xmlinput.start"; public static final string end_tag_key = "xmlinput.end"; @suppresswarnings("unchecked") @override public recordreader<longwritable, text> getrecordreader(final inputsplit genericsplit, jobconf job, reporter reporter) throws ioexception { return new xmlrecordreader((filesplit) genericsplit, job, reporter); } public static class xmlrecordreader implements recordreader<longwritable, text> { private final byte[] endtag; private final byte[] starttag; private final long start; private final long end; private final fsdatainputstream fsin; private final dataoutputbuffer buffer = new dataoutputbuffer(); private longwritable currentkey; private text currentvalue; public xmlrecordreader(filesplit split, jobconf conf, reporter reporter) throws ioexception { starttag = conf.get(start_tag_key).getbytes("utf-8"); endtag = conf.get(end_tag_key).getbytes("utf-8"); start = split.getstart(); end = start + split.getlength(); path file = split.getpath(); filesystem fs = file.getfilesystem(conf); fsin = fs.open(split.getpath()); fsin.seek(start); } public boolean next(longwritable key, text value) throws ioexception { if (fsin.getpos() < end && readuntilmatch(starttag, false)) { try { buffer.write(starttag); if (readuntilmatch(endtag, true)) { key.set(fsin.getpos()); value.set(buffer.getdata(), 0, buffer.getlength()); return true; } } { buffer.reset(); } } return false; } public boolean readuntilmatch(byte[] match, boolean withinblock) throws ioexception { int = 0; while (true) { int b = fsin.read(); if (b == -1) { return false; } if (withinblock && b != (byte) '\r' && b != (byte) '\n') { buffer.write(b); } if (b == match[i]) { i++; if (i >= match.length) { return true; } } else { = 0; } if (!withinblock && == 0 && fsin.getpos() >= end) { return false; } } } @override public float getprogress() throws ioexception { return (fsin.getpos() - start) / (float) (end - start); } @override public synchronized long getpos() throws ioexception { return fsin.getpos(); } @override public longwritable createkey() { return new longwritable(); } @override public text createvalue() { return new text(); } @override public synchronized void close() throws ioexception { fsin.close(); } } }
here's output
$ hdfs dfs -text /poc/testout001/part-* 25 <a><b><c>val1</c></b></a> 52 <a><b><c>val2</c></b></a> 80 <a><b><c>val3</c></b></a> 141 <a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>
Comments
Post a Comment