bigdata - Hadoop Pig Output Directory not set -
i'm writing own pig store class don't want store in file, i'm planning send 3rd party info store (short of api calls).
note: i'm running on cloudera's virtualbox image.
i have written java classes (listed below) , created mystore.jar i'm using in below id.pig script:
store b 'mylocation' using mystore('mynewlocation')
while running script pig, see below errors: error 6000: output location validation failed for: 'file://home/cloudera/test/id.out more info follow: output directory not set.
or.apache.pig.impl.plan.visitorexception: error 6000: @ or.apache.pig.newplan.logical.rules.inputoutputfilevalidator$inputoutputfilevalidator.visit(inputoutputfilevalidator.java:95)
please help!
-------------------- mystore.java ----------------------
public class mystore extends storefunc { protected recordwriter author = null; private string location = null; public mystore () { location= null; } public mystore (string location) { this.location= location; } @override public outputformat getoutputformat() throws ioexception { homecoming new mystoreoutputformat(location); } @override public void preparetowrite(recordwriter writer) throws ioexception { this.writer = writer; } @override public void putnext(tuple tuple) throws ioexception { //write tuple location seek { writer.write(null, tuple.tostring()); } grab (interruptedexception e) { e.printstacktrace(); } } @override public void setstorelocation(string location, job job) throws ioexception { if(location!= null) this.location= location; } }
-------------------- mystoreoutputformat.java ----------------------
import java.io.dataoutputstream; import java.io.ioexception; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.fsdataoutputstream; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.writablecomparable; import org.apache.hadoop.mapreduce.recordwriter; import org.apache.hadoop.mapreduce.taskattemptcontext; import org.apache.hadoop.mapreduce.lib.output.textoutputformat; import org.apache.pig.data.tuple; public class mystoreoutputformat extends textoutputformat<writablecomparable, tuple> { private string location = null; public mystoreoutputformat(string location) { this.location = location; } @override public recordwriter<writablecomparable, tuple> getrecordwriter( taskattemptcontext job) throws ioexception, interruptedexception { configuration conf = job.getconfiguration(); string extension = location; path file = getdefaultworkfile(job, extension); filesystem fs = file.getfilesystem(conf); fsdataoutputstream fileout = fs.create(file, false); homecoming new mystorerecordwriter(fileout); } protected static class mystorerecordwriter extends recordwriter<writablecomparable, tuple> { dataoutputstream out = null; public mystorerecordwriter(dataoutputstream out) { this.out = out; } @override public void close(taskattemptcontext taskcontext) throws ioexception, interruptedexception { // close location } @override public void write(writablecomparable key, tuple value) throws ioexception, interruptedexception { // write info location if (out != null) { out.writechars(value.tostring()); // calling api later. allow me first dump location! } } } }
am missing here?
firstly, think should using job configuration store location value, rather instance variable
your assignment local variable 'location' in setstorelocation method called when planning job, getoutputformat phone call may not made until execution phase, time location variable may no longer set (a new instance of class may have been created).
if @ source pigstorage.setstorelocation
, should notice store location in job configuration (2nd line):
@override public void setstorelocation(string location, job job) throws ioexception { job.getconfiguration().set("mapred.textoutputformat.separator", ""); fileoutputformat.setoutputpath(job, new path(location)); if( "true".equals( job.getconfiguration().get( "output.compression.enabled" ) ) ) { fileoutputformat.setcompressoutput( job, true ); string codec = job.getconfiguration().get( "output.compression.codec" ); seek { fileoutputformat.setoutputcompressorclass( job, (class<? extends compressioncodec>) class.forname( codec ) ); } grab (classnotfoundexception e) { throw new runtimeexception("class not found: " + codec ); } } else { // makes storing directory ending ".gz" or ".bz2" works. setcompression(new path(location), job); } }
so think should store location in job variable:
@override public void setstorelocation(string location, job job) throws ioexception { if(location!= null) job.getconfiguration().set("mylocation", location); }
which custom output format can extract in createrecordreader method:
@override public recordwriter<writablecomparable, tuple> getrecordwriter( taskattemptcontext job) throws ioexception, interruptedexception { configuration conf = job.getconfiguration(); string extension = conf.get("mylocation"); path file = getdefaultworkfile(job, extension); filesystem fs = file.getfilesystem(conf); fsdataoutputstream fileout = fs.create(file, false); homecoming new mystorerecordwriter(fileout); }
finally (and actual cause of error you're seeing), output format extends textoutputformat, , utilize getdefaultworkfile
method in record author - method needs know outputting file in hdfs, , haven't called fileoutputformat.setoutputpath(job, new path(location));
in setstorelocation method (see pigstorage.setstorelocation method pasted). error because doesn't know create default work file.
hadoop bigdata apache-pig
No comments:
Post a Comment