Chapter 11. Writing Load and Store Functions
We will now consider some of the more complex and most critical parts of Pig, data input and output. Operating on huge data sets is inherently I/O intensive. Hadoop's massive parallelism and movement of processing to the data mitigates but does not remove this. Having efficient methods to load and store data is therefore critical. Pig provides default load and store functions for text data and for HBase. Many users find they need to write their own load and store functions to handle the data formats and storage mechanisms they use.
As with evaluation functions, the design goal for
load and store functions was to make easy things easy, and hard things
possible. Also, we wanted to make load and store functions a thin wrapper
over Hadoop's InputFormat and
OutputFormat. The intention is that once you have an
input format and output format for your data, the additional work of
creating and storing Pig tuples is minimal. In the same way evaluation
functions were implemented, more complex features such as schema management
and projection push down are done via separate interfaces to avoid
cluttering the base interface. Pig's load and store functions were
completely rewritten between versions 0.6 and 0.7. This chapter will only
cover the interfaces for 0.7 and later releases.
One other important design goal for load and store
functions is to not assume that the input sources and output sinks are HDFS.
So, while throughout this book in the examples A = load 'foo';
has implied that foo is a file, there is no need for that to be
the case. foo is a resource locator that makes sense to your
load function. It could be an HDFS file, an HBase table, a database JDBC
connection string, or a web service URL. Since reading from HDFS is the
most common case, many defaults and helper functions are provided for this
case.
In this chapter we will walk through writing a load
function and a store function for JSON data on HDFS,
JsonLoader and JsonStorage.
These are located in the example code in
udfs/java/com/acme/io. They use the Jackson JSON
library, which is included in your Pig distribution. However, the Jackson
jar is not shipped to the backend by Pig. So when using these UDFs in your
script you will need to register the Jackson jar in addition to the acme
examples jar.
register 'acme.jar'; register 'src/pig/trunk/build/ivy/lib/Pig/jackson-core-asl-1.6.0.jar';
These UDFs will serve as helpful examples, but will not cover all of the functionality of load and store functions. For those sections not shown in the example, we will look at other existing load and store functions for examples.
Load Functions
Pig's load function is built on top of a Hadoop
InputFormat, the class which Hadoop uses to read
data. InputFormat serves two functions: it
determines how input will be split between map tasks, and it provides a
RecordReader that produces key value pairs as input
to those map tasks. The load function takes these key value pairs and
returns a Pig Tuple.
The base class for the load function is
LoadFunc. This is an abstract class which allows it
to provide helper functions and default implementations. Many load
functions will only need to extend LoadFunc.
Load functions' operations are split between Pig's
front end and back end. On the front end Pig does job planning and
optimization, and load functions participate in this in several ways that
we will discuss. On the back end load functions get each record from the
RecordReader, convert it to a tuple, and pass it on
to Pig's map task. Load functions also need to be able to pass data
between the their front end and the back end invocations so they can
maintain state.
Front End Planning Functions
For all load functions, Pig must do three things as part of front end planning: 1) it needs to know the input format it should use to read the data; 2) it needs to be sure that the load function understands where its data is located; and 3) it needs to know how to cast bytearrays returned from the load function.
Determining InputFormat
Pig needs to know which
InputFormat to use for reading your input. It
calls getInputFormat to get an instance of the input
format. It gets an instance rather than the class itself so that your
load function can control the instantiation: any generic parameters,
constructor arguments, etc. For our example load function, this
method is very simple. It uses TextInputFormat,
an input format that reads text data from HDFS files.
// JsonLoader.java
public InputFormat getInputFormat() throws IOException {
return new TextInputFormat();
}Determining the Location
Pig communicates the location string provided
by the user to the load function via setLocation. So if
the load operator in Pig Latin is A = load 'input'; then
“input” is the location string. This method is called on
both the front and back ends, possibly multiple times. Thus you need
to take care that this method does not do anything that will cause
problems if done more than one time. Your load function should
communicate the location to its input format. For example,
JsonLoader passes the filename via a helper
method on FileInputFormat (a super class of
TextInputFormat).
// JsonLoader.java
public void setLocation(String location, Job job) throws IOException {
FileInputFormat.setInputPaths(job, location);
}The Hadoop Job is
passed along with the location, since that is the location where input
formats usually store their configuration information.
setLocation is called on both the
front and back ends because input formats store their location in the
Job object, as shown in the above example. For MapReduce jobs, that
always have only one input, this works. For Pig jobs, where the same
input format may be used to load multiple different inputs, such as in
the join or union case, one instance of the input path will overwrite
another in the Job object. To work around this, Pig remembers the
location in an input specific parameter, and calls
setLocation again on the backend so that the input format
can get itself set up properly before reading.
For files on HDFS the location provided by the user may be a relative location, rather than absolute. To deal with this Pig needs to resolve these to absolute locations based on the current working directory at the time of the load. Consider the following Pig Latin:
cd /user/joe; input1 = load 'input'; cd /user/fred; input2 = load 'input';
These two load statements should load
different files. But Pig cannot assume it understands how to turn a
relative path into an absolute path, because it does not know what
that input is. It could be an HDFS path, a database table name, etc.
So it leaves this to the load function. Before calling
setLocation Pig passes the location string to
relativeToAbsolutePath to do any necessary conversion.
Since most loaders are reading from HDFS, the default implementation
in LoadFunc handles the HDFS case. If your loading will
never need to do this conversion, it should override this method and
return the location string passed to it.
Getting the Casting Functions
Some Pig functions, such as
PigStorage and
HBaseStorage, load data by default without
understanding its type information. They place the data unchanged in
DataByteArray objects. At a later time when
Pig needs to cast that data to another type, it does not know how to,
since it does not understand how the data is represented in the
bytearray. Therefore it relies on the load function to provide a
method to cast from bytearray to the appropriate type.
Pig determines which set of casting functions
to use by calling getLoadCaster on the load function.
This should return either null, which indicates that your
load function does not expect to do any bytearray casts, or an
implementation of the LoadCaster interface,
which will be used to do the casts. We will look at the methods of
LoadCaster in the section called “Casting Bytearrays” below.
Our example loader returns null,
because it provides typed data based on the stored schema, and
therefore does not expect to be casting data. Any bytearrays in its
data are binary data that should not be cast.
Passing Information from the Front End to the Back End
As with evaluation functions, load functions can
make use of UDFContext to pass information from
front end invocations to back end invocations. For details on
UDFContext see the section called “UDFContext”.
One significant difference between using
UDFContext in evaluation and load functions is
determining the instance-specific signature of the function. In
evaluation functions constructor arguments were suggested as a way to do
this. For load functions the input location will usually be the
differentiating factor. However, LoadFunc does
not guarantee that it will call setLocation before other
methods where you might want to use the
UDFContext. To work around this
setUDFContextSignature is provided. It provides an
instance-unique signature that you can use when calling
getUDFProperties. This method is guaranteed to be called
before any other methods on LoadFunc in both the
front and back ends. Your UDF can then store this signature and use it
when getting its property object.
// JsonLoader.java
private String udfcSignature = null;
public void setUDFContextSignature(String signature) {
udfcSignature = signature;
}setLocation is the only method in
the load function that is guaranteed to be called on the front end. It
is therefore the best candidate for storing needed information to
UDFContext. You may need to check that the data
you are writing is available and non-null, to avoid over-writing your
values when setLocation is called on the backend.
Back End Data Reading
On the backend, your load function takes the key
value pairs produced by its input format and produces Pig
Tuples.
Getting Ready to Read
Before reading any data, Pig gives your load
function a chance to set itself up by calling
prepareToRead. This is called in each map task. It
passes a copy of the RecordReader, which your
load function will need later to read records from the input.
RecordReader is a class that
InputFormat uses to read records from an input
split. Pig obtains the record reader it passes to
prepareToRead by calling getRecordReader on
the input format that your store function returned from
getInputFormat. It also passes an instance of the
PigSplit which contains the Hadoop
InputSplit corresponding to the partition of
input this instance of your load function will read. If you need
split-specific information you can get it from here.
Our example loader, beyond storing the record
reader, also reads the schema file that was stored into
UDFContext in the front end so that it knows
how to parse the input file. Notice how it uses the signature passed
in setUDFContextSignature to access the appropriate
properties object. Finally, it creates a
JsonFactory object that is used to generate a
parser for each line.
// JsonLoader.java
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
this.reader = reader;
// Get the schema string from the UDFContext object.
UDFContext udfc = UDFContext.getUDFContext();
Properties p =
udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
String strSchema = p.getProperty("pig.jsonloader.schema");
if (strSchema == null) {
throw new IOException("Could not find schema in UDF context");
}
// Parse the schema from the string stored in the properties object.
ResourceSchema schema =
new ResourceSchema(Utils.getSchemaFromString(strSchema));
fields = schema.getFields();
jsonFactory = new JsonFactory();
}Reading Records
Now we are to the meat of your load function,
reading records from its record reader and returning tuples to Pig.
Pig will call getNext and place the resulting tuple into
its processing pipeline. It will keep doing this until
getNext returns a null, which indicates that
the input for this split has been fully read.
Pig does not copy the tuple that results from this method, but feeds it directly to its pipeline to avoid the copy overhead. This means this method cannot re-use objects, but must create a new tuple and contents for each record it reads. On the other hand, record readers may choose to re-use their key and value objects from record to record; most standard implementations do. So, before writing a loader that tries to be efficient and wrap the keys and values from the record reader directly into the tuple to avoid a copy, you must make sure you understand how the record reader is managing its data.
For information on creating the appropriate Java objects when constructing tuples for Pig, see the section called “Interacting with Pig Values”.
Our sample load function's implementation of
getNext reads the value from the Hadoop record (the key
is ignored), constructs a JsonParser to
parse it, parses the fields, and returns the resulting tuple. If
there are parse errors it does not throw an exception. Instead it
returns a tuple with null fields where the data could not be parsed.
This prevents bad lines from causing the whole job to fail. Warnings
are issued so that users can see which records were ignored.
// JsonLoader.java
public Tuple getNext() throws IOException {
Text val = null;
try {
// Read the next key value pair from the record reader. If it's
// finished, return null
if (!reader.nextKeyValue()) return null;
// Get the current value. We don't use the key.
val = (Text)reader.getCurrentValue();
} catch (InterruptedException ie) {
throw new IOException(ie);
}
// Create a parser specific for this input line. This may not be the
// most efficient approach.
ByteArrayInputStream bais = new ByteArrayInputStream(val.getBytes());
JsonParser p = jsonFactory.createJsonParser(bais);
// Create the tuple we will be returning. We create it with the right
// number of fields, as the Tuple object is optimized for this case.
Tuple t = tupleFactory.newTuple(fields.length);
// Read the start object marker. Throughout this file if the parsing
// isn't what we expect we return a tuple with null fields rather than
// throwing an exception. That way a few mangled lines don't fail the job.
if (p.nextToken() != JsonToken.START_OBJECT) {
log.warn("Bad record, could not find start of record " + val.toString());
return t;
}
// Read each field in the record
for (int i = 0; i < fields.length; i++) {
t.set(i, readField(p, fields[i], i));
}
if (p.nextToken() != JsonToken.END_OBJECT) {
log.warn("Bad record, could not find end of record " +
val.toString());
return t;
}
p.close();
return t;
}
private Object readField(JsonParser p,
ResourceFieldSchema field,
int fieldnum) throws IOException {
// Read the next token
JsonToken tok = p.nextToken();
if (tok == null) {
log.warn("Early termination of record, expected " + fields.length
+ " fields bug found " + fieldnum);
return null;
}
// Check to see if this value was null
if (tok == JsonToken.VALUE_NULL) return null;
// Read based on our expected type
switch (field.getType()) {
case DataType.INTEGER:
// Read the field name
p.nextToken();
return p.getValueAsInt();
case DataType.LONG:
p.nextToken();
return p.getValueAsLong();
case DataType.FLOAT:
p.nextToken();
return (float)p.getValueAsDouble();
case DataType.DOUBLE:
p.nextToken();
return p.getValueAsDouble();
case DataType.BYTEARRAY:
p.nextToken();
byte[] b = p.getBinaryValue();
// Use the DBA constructor that copies the bytes so that we own
// the memory
return new DataByteArray(b, 0, b.length);
case DataType.CHARARRAY:
p.nextToken();
return p.getText();
case DataType.MAP:
// Should be a start of the map object
if (p.nextToken() != JsonToken.START_OBJECT) {
log.warn("Bad map field, could not find start of object, field "
+ fieldnum);
return null;
}
Map<String, String> m = new HashMap<String, String>();
while (p.nextToken() != JsonToken.END_OBJECT) {
String k = p.getCurrentName();
String v = p.getText();
m.put(k, v);
}
return m;
case DataType.TUPLE:
if (p.nextToken() != JsonToken.START_OBJECT) {
log.warn("Bad tuple field, could not find start of object, "
+ "field " + fieldnum);
return null;
}
ResourceSchema s = field.getSchema();
ResourceFieldSchema[] fs = s.getFields();
Tuple t = tupleFactory.newTuple(fs.length);
for (int j = 0; j < fs.length; j++) {
t.set(j, readField(p, fs[j], j));
}
if (p.nextToken() != JsonToken.END_OBJECT) {
log.warn("Bad tuple field, could not find end of object, "
+ "field " + fieldnum);
return null;
}
return t;
case DataType.BAG:
if (p.nextToken() != JsonToken.START_ARRAY) {
log.warn("Bad bag field, could not find start of array, "
+ "field " + fieldnum);
return null;
}
s = field.getSchema();
fs = s.getFields();
// Drill down the next level to the tuple's schema.
s = fs[0].getSchema();
fs = s.getFields();
DataBag bag = bagFactory.newDefaultBag();
JsonToken innerTok;
while ((innerTok = p.nextToken()) != JsonToken.END_ARRAY) {
if (innerTok != JsonToken.START_OBJECT) {
log.warn("Bad bag tuple field, could not find start of "
+ "object, field " + fieldnum);
return null;
}
t = tupleFactory.newTuple(fs.length);
for (int j = 0; j < fs.length; j++) {
t.set(j, readField(p, fs[j], j));
}
if (p.nextToken() != JsonToken.END_OBJECT) {
log.warn("Bad bag tuple field, could not find end of "
+ "object, field " + fieldnum);
return null;
}
bag.add(t);
}
return bag;
default:
throw new IOException("Unknown type in input schema: " +
field.getType());
}
}Additional Load Function Interfaces
Your load function can provide more complex features by implementing additional interfaces. Implementation of these interfaces is optional.
Loading Metadata
Many data storage mechanisms can record the schema along with the data. Pig does not assume the ability to store schemas. But if your storage can store the schema, it can be very useful. This frees script writers from needing to specify the field names and types as part of the load operator in Pig Latin. This is user friendly, less error prone, and avoids the need to rewrite scripts when the schema of your data changes.
Some types of data storage also partition the data. If Pig understands this partitioning, it can load only those partitions that are needed for a particular script. Both of these functions are enabled by implementing the LoadMetadata interface.
getSchema in the
LoadMetadata interface gives your load function
a chance to provide a schema. It is passed the location string
provided by the user as well as the Hadoop Job object, in case it
needs information in this object to open the schema. It is expected
to return a ResourceSchema that represents the
data that will be returned. ResourceSchema is
very similar to the Schema class used by
evaluation functions. See the section called “Input and Output Schemas” for
details. There is one important difference. In
ResourceFieldSchema, the schema object
associated with a bag always has one field, which is a tuple. The
schema for the tuples in the bag is described by that tuple's
ResourceFieldSchema.
Our example load and store functions store the
schema in a side file[27] named
_schema in HDFS. Our implementation of
getSchema reads this file. It also serializes the schema
into UDFContext so that it is available on the
back end.
// JsonLoader.java
public ResourceSchema getSchema(String location, Job job)
throws IOException {
// Open the schema file and read the schema
// Get an HDFS handle.
FileSystem fs = FileSystem.get(job.getConfiguration());
DataInputStream in = fs.open(new Path(location + "/_schema"));
String line = in.readLine();
in.close();
// Parse the schema
ResourceSchema s = new ResourceSchema(Utils.getSchemaFromString(line));
if (s == null) {
throw new IOException("Unable to parse schema found in file " +
location + "/_schema");
}
// Now that we have determined the schema, store it in our
// UDFContext properties object so we have it when we need it on the
// backend
UDFContext udfc = UDFContext.getUDFContext();
Properties p =
udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
p.setProperty("pig.jsonloader.schema", line);
return s;
}Once your loader implements
getSchema, load statements that use your
loader do not need to declare their schemas in order for the field
names to be used in the script. For example, if we had data with a
schema of user:chararray, age:int, gpa:double, the
following Pig Latin will compile and run:
register 'acme.jar';
register 'src/pig/trunk/build/ivy/lib/Pig/jackson-core-asl-1.6.0.jar';
A = load 'input' using com.acme.io.JsonLoader();
B = foreach A generate user;
dump B;LoadMetadata also
includes a getStatistics method. Pig does not yet make
use of statistics in job planning; this method is for future
use.
Using Partitions
Some types of storage partition their data,
allowing you to read only the relevant sections for a given job. The
LoadMetadata interface also provides methods
for working with partitions in your data. In
order for Pig to request the relevant partitions, it must know how the
data is partitioned. Pig determines this by calling
getPartitionKeys. If this returns a null,
or the LoadMetadata interface is not
implemented by your loader, then Pig will assume it needs to read the
entire input.
Pig expects getPartitionKeys to
return an array of strings, where each string represents one field
name. Those fields are the keys used to partition the data. Pig will
look for a filter statement immediately following the
load statement that includes one or more of these fields.
If such a statement is found, it will be passed to
setPartitionFilter. If the filter includes
both partition and non-partition keys and it is can be
split[28] Pig will split it and
pass just the partition key-related expression to
setPartitionFilter. As an example, consider an
HCatalog[29] table web_server_logs
that is partitioned by two fields, date and
colo.
logs = load 'web_server_logs' using HCatLoader(); cleaned = filter logs by date = '20110614' and NotABot(user_id); ...
Pig will call getPartitionKeys
and HCatLoader will return two key names, date and
colo. Pig will find the date
field in the filter statement and rewrite the as shown
below, pushing down the date = '20110614' predicate to
HCatLoader via setPartitionFilter
logs = load 'web_server_logs' using HCatLoader(); cleaned = filter logs by NotABot(user_id); ...
. It is now up to HCatalog loader to assure
that it only returns data from web_server_logs
where date is 20110614.
The one exception to this is fields used in eval funcs or filter funcs. Pig assumes that loaders do not understand how to invoke UDFs, so Pig will not push these expressions.
Our example loader does not implement
getPartitionKeys or setPartitionFilter,
since it works on file data. For an example implementation of these
methods, see the HCatalog code at http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java?view=markup.
Casting Bytearrays
If you need to control how binary data that
your loader loads is cast to other data types you can implement the
LoadCaster interface. Since this interface
contains a lot of methods, implementers often implement it as a
separate class. This also allows load functions to share
implementations of LoadCaster, since Java does
not support multiple inheritance.
The interface consists of a series of methods:
bytesToInteger, bytesToLong, etc. These
will be called to convert a bytearray to the appropriate type.
Starting in 0.9, there are two bytesToMap methods. You
should implement the one that takes a
ResourceFieldSchema. The other one is for backward
compatibility. The bytesToBag,
bytesToTuple, and bytesToMap methods take a
ResourceFieldSchema that describes the field being
converted. Calling getSchema on this object will return
a schema that describes this bag, tuple, or map, if one exists. If
Pig does not know the intended structure of the object,
getSchema will return null. Keep in mind that the schema
of the bag will be one field, a tuple, which in turn will have a
schema describing the contents of that tuple.
A default load caster
Utf8StorageConverter is provided. It handles
converting UTF8 encoded text to Pig types. Scalar conversions are
done in a straight forward way. Maps are expected to be surrounded by
“[]” (square brackets), have keys separated by values
with “#” (hash) and key value pairs separated by
“,” (commas). Tuples are surrounded by “()”
(parenthesis) and have fields separated by “,” (commas).
Bags are surrounded by “{}” (braces) and have tuples
separated by “,” (commas). There is no ability to escape
these special characters.
Pushing Down Projections
Often a Pig Latin script will only need to read a few of the fields in the input. Some types of storage formats store their data by fields instead of by records (for example Hive's RCFile). For these types of formats there is a significant performance gain to be had by only loading fields that will be used in the script. Even for record oriented storage formats it can be useful to skip de-serializing fields that will not be used.
As part of its optimizations Pig analyzes Pig Latin scripts and determines what fields in an input it needs at each step in the script. It uses this information to aggressively drop fields it no longer needs. If the loader implements the LoadPushDown interface, Pig can go a step further and provide this information to the loader.
Once Pig knows the fields it needs, it
assembles them in a RequiredFieldList and
passes that to pushProjection. The load function can
reply whether it can meet the request or not. It responds with a
RequiredFieldResponse, which is a fancy wrapper
around a Boolean. If the Boolean is true, then Pig will assume that
only the required fields are being returned from getNext.
If it is false, then Pig will assume that all fields are being
returned by getNext and it will handle dropping the extra
ones itself.
The RequiredField class
used to describe which fields are required is slightly complex.
Beyond allowing a user to specify whether a given field is required,
it provides the ability to specify which sub-fields of that field are
required. For example, for maps certain keys can be listed as
required. For tuples and bags, certain fields can be listed as
required.
Load functions that implement
LoadPushDown should not modify the schema
object returned by getSchema. This should always be the
schema of the full input. Pig will manage the translation between the
schema having all of the fields and the results of
getNext having only some.
Our example loader does not implement
LoadPushDown. For an example of a loader that
does, see the HCatLoader at http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java?view=markup.
Store Functions
Pig's store function is, in many ways, a mirror
image of the load function. It is built on top of Hadoop's
OutputFormat. It takes Pig
Tuples and creates key value pairs that its
associated output format writes to storage.
StoreFunc is an abstract
class. This allows it to provide default implementations for some
methods. However, some functions implement both load and store
functionality; PigStorage is an example. Since Java does not
support multiple inheritance, the interface
StoreFuncInterface is provided. These dual
load/store functions can implement this interface rather than extending
StoreFunc.
Store function operations are split between the
front end and back end of Pig. Pig does planning and optimization on the
front end. Store functions have an opportunity at this time to check that
a valid schema is being used and setup the storage location. On the back
end store functions take a tuple from Pig, convert it to a key value pair,
and pass it to a Hadoop RecordWriter. Store
functions can pass information from front end invocations to back end
invocations via UDFContext.
Store Function Front End Planning
Store functions have three tasks to fulfill on
the front end: 1) they have to instantiate the
OutputFormat they will use to store data; 2) they
have to check the schema of the data being stored; and 3) they need to
record the location the data will be stored.
Determining OutputFormat
Pig calls getOutputFormat to get
an instance of the output format that your store function will use to
store records. This method returns an instance rather than the
classname or the class itself. This allows your store function to
control how the class is instantiated. Our example store function,
JsonStorage uses
TextOutputFormat. This is an output format
that stores text data in HDFS. We have to instantiate this with a key
of LongWritable and a value of
Text to match the expectations of
TextInputFormat.
// JsonStorage.java
public OutputFormat getOutputFormat() throws IOException {
return new TextOutputFormat<LongWritable, Text>();
}Setting the Output Location
Pig calls setStoreLocation to
communicate the location string provided by the user to your store
function. Given the Pig Latin store Z into 'output';,
“output” is the location string. This method is called
on both the front end and the back end and could be called multiple
times. For this reason it should not have any side effects that will
cause a problem if they happen multiple times. Your store function
will need to communicate the location to its output format. Our
example store function uses the
FileOutputFormat utility function
setOutputPath to do this.
// JsonStorage.java
public void setStoreLocation(String location, Job job) throws IOException {
FileOutputFormat.setOutputPath(job, new Path(location));
}The Hadoop Job is
passed to this function as well. Most output formats store the
location information in the job.
Pig calls setStoreLocation on
both the front and back end because output formats usually store their
location in the job, as we see in our example store function. This
works for MapReduce jobs, where a single output format is guaranteed.
But due to the split operator, Pig can have more than one
instance of the same store function in a job. If multiple instances
of a store function call FileOutputFormat.setOutputPath
whichever instance calls it last will overwrite the others. Pig
avoids this by keeping output-specific information and calling
setStoreLocation again on the backend so that it can
properly configure the output format.
For HDFS files the user may provide a relative
path. Pig needs to resolve these to absolute paths using the current
working directory at the time the store is called. To accomplish this
Pig calls relToAbsPathForStoreLocation with the
user-provided location string before calling
setStoreLocation. This method translates between
relative and absolute paths. For store functions writing to HDFS, the
default implementation in StoreFunc handles the
conversion. If you are writing a store function that does not use
file paths, e.g. HBase, you should override this method to return the
string it is passed.
Checking the Schema
As part of front end planning Pig gives your
store function a chance to check the schema of the data to be stored.
If you are storing data to a system that expects a certain schema for
the output, such as an RDBMS, or you cannot store certain data types,
this is the place to perform those checks. Oddly enough this method
returns a void rather than a Boolean. So if you detect an issue with
the schema you must throw an IOException.
Our example store function does not have
limitations on the schemas it can store. However, it uses this
function as a place to serialize the schema into
UDFContext so that it can be used on the back
end when writing data.
// JsonStorage.java
public void checkSchema(ResourceSchema s) throws IOException {
UDFContext udfc = UDFContext.getUDFContext();
Properties p =
udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
p.setProperty("pig.jsonstorage.schema", s.toString());
}Store Functions and UDFContext
Store functions work with
UDFContext exactly as load functions do with one
exception. The signature for store functions is passed to the store
function via setStoreFuncUDFContextSignature. See the section called “Passing Information from the Front End to the Back End” for a discussion of how load functions
work with UDFContext. Our example store function
stores the signature in a member variable for later use.
// JsonStorage.java
public void setStoreFuncUDFContextSignature(String signature) {
udfcSignature = signature;
} Writing Data
During back end processing, the store function is first initialized, and then takes Pig tuples and converts them to key value pairs to be written to storage.
Preparing to Write
Pig calls your store function's
prepareToWrite method in each map or reduce task before
writing any data. This call passes a
RecordWriter instance to use when writing data.
RecordWriter is a class that
OutputFormat uses to write individual records.
Pig will get the record writer it passes to your store function by
calling getRecordWriter on the output format your store
function returned from getOutputFormat. Your store
function will need to keep this reference so that it can be used in
putNext.
The example store function
JsonStorage also uses this method to read the
schema out of the UDFContext. It will use this
schema when storing data. Finally, it creates a
JsonFactory for use in
putNext.
// JsonStorage.java
public void prepareToWrite(RecordWriter writer) throws IOException {
// Store the record writer reference so we can use it when it's time
// to write tuples
this.writer = writer;
// Get the schema string from the UDFContext object.
UDFContext udfc = UDFContext.getUDFContext();
Properties p =
udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
String strSchema = p.getProperty("pig.jsonstorage.schema");
if (strSchema == null) {
throw new IOException("Could not find schema in UDF context");
}
// Parse the schema from the string stored in the properties object.
ResourceSchema schema =
new ResourceSchema(Utils.getSchemaFromString(strSchema));
fields = schema.getFields();
// Build a Json factory
jsonFactory = new JsonFactory();
jsonFactory.configure(
JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
}Writing Records
putNext is the core method in the
store function class. Pig calls this method for every tuple it needs
to store. Your store function needs to take these tuples and produce
the key value pairs that its output format expects. For information
on the Java objects that the data will be stored in and how to extract
them see the section called “Interacting with Pig Values”.
JsonStorage encodes the
contents of the tuple in JSON format and writes the resulting string
into the value field of TextOutputFormat. The
key field is left null.
// JsonStorage.java
public void putNext(Tuple t) throws IOException {
// Build a ByteArrayOutputStream to write the JSON into
ByteArrayOutputStream baos = new ByteArrayOutputStream(BUF_SIZE);
// Build the generator
JsonGenerator json =
jsonFactory.createJsonGenerator(baos, JsonEncoding.UTF8);
// Write the beginning of the top level tuple object
json.writeStartObject();
for (int i = 0; i < fields.length; i++) {
writeField(json, fields[i], t.get(i));
}
json.writeEndObject();
json.close();
// Hand a null key and our string to Hadoop
try {
writer.write(null, new Text(baos.toByteArray()));
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
private void writeField(JsonGenerator json,
ResourceFieldSchema field,
Object d) throws IOException {
// If the field is missing or the value is null, write a null
if (d == null) {
json.writeNullField(field.getName());
return;
}
// Based on the field's type, write it out
switch (field.getType()) {
case DataType.INTEGER:
json.writeNumberField(field.getName(), (Integer)d);
return;
case DataType.LONG:
json.writeNumberField(field.getName(), (Long)d);
return;
case DataType.FLOAT:
json.writeNumberField(field.getName(), (Float)d);
return;
case DataType.DOUBLE:
json.writeNumberField(field.getName(), (Double)d);
return;
case DataType.BYTEARRAY:
json.writeBinaryField(field.getName(), ((DataByteArray)d).get());
return;
case DataType.CHARARRAY:
json.writeStringField(field.getName(), (String)d);
return;
case DataType.MAP:
json.writeFieldName(field.getName());
json.writeStartObject();
for (Map.Entry<String, Object> e : ((Map<String, Object>)d).entrySet()) {
json.writeStringField(e.getKey(), e.getValue().toString());
}
json.writeEndObject();
return;
case DataType.TUPLE:
json.writeFieldName(field.getName());
json.writeStartObject();
ResourceSchema s = field.getSchema();
if (s == null) {
throw new IOException("Schemas must be fully specified to use "
+ "this storage function. No schema found for field " +
field.getName());
}
ResourceFieldSchema[] fs = s.getFields();
for (int j = 0; j < fs.length; j++) {
writeField(json, fs[j], ((Tuple)d).get(j));
}
json.writeEndObject();
return;
case DataType.BAG:
json.writeFieldName(field.getName());
json.writeStartArray();
s = field.getSchema();
if (s == null) {
throw new IOException("Schemas must be fully specified to use "
+ "this storage function. No schema found for field " +
field.getName());
}
fs = s.getFields();
if (fs.length != 1 || fs[0].getType() != DataType.TUPLE) {
throw new IOException("Found a bag without a tuple "
+ "inside!");
}
// Drill down the next level to the tuple's schema.
s = fs[0].getSchema();
if (s == null) {
throw new IOException("Schemas must be fully specified to use "
+ "this storage function. No schema found for field " +
field.getName());
}
fs = s.getFields();
for (Tuple t : (DataBag)d) {
json.writeStartObject();
for (int j = 0; j < fs.length; j++) {
writeField(json, fs[j], t.get(j));
}
json.writeEndObject();
}
json.writeEndArray();
return;
}
}Failure Cleanup
When jobs fail after execution has started your
store function may need to cleanup partially stored results. Pig will
call cleanupOnFailure to give your store function an
opportunity to do this. It passes the location string and the job
object so that your store function knows what it should cleanup. In the
HDFS case, the default implementation handles removing any output files
created by the store function. You only need to implement this method
if you are storing data somewhere other than HDFS.
Storing Metadata
If your storage format can store schemas in
addition to data, then your store function can implement the interface
StoreMetadata. This provides a
storeSchema method that is called by Pig as part of its
front end operations. It passes a
ResourceSchema, the location string, and the job
object so that it can connect to its storage. The
ResourceSchema is very similar to the
Schema class described in the section called “Input and Output Schemas”. There is one important difference. In
ResourceFieldSchema, the schema object associated
with a bag always has one field, which is a tuple. The schema for the
tuples in the bag is described by that tuple's
ResourceFieldSchema.
The example store function
JsonStorage stores the schema in a side file
named _schema in the same directory as the data.
The schema is stored as a string, using the toString method
provided by the class.
// JsonStorage.java
public void storeSchema(ResourceSchema schema, String location, Job job)
throws IOException {
// Store the schema in a side file in the same directory. MapReduce
// does not include files starting with "_" when reading data for a job.
FileSystem fs = FileSystem.get(job.getConfiguration());
DataOutputStream out = fs.create(new Path(location + "/_schema"));
out.writeBytes(schema.toString());
out.writeByte('\n');
out.close();
}StoreMetadata also has a
storeStatistics function, but Pig does not use this
yet.
[27] A file in the same directory, but
which is not a part file. Side files start with an underscore
character. MapReduce's FileInputFormat knows
to ignore them when reading input for a job.
[28] Meaning that the filter can be broken into two
filters, one that contains the partition keys and one that does not,
and produce the same end result. This is possible when the
expressions are connected by and, but not when they are
connected by or.
[29] HCatalog is a table management service for Hadoop. It includes Pig load and store functions. See the section called “Metadata In Hadoop” for more information on HCatalog.





Add a comment



Add a comment