Chapter 5. Introduction to Pig Latin
It is time to dig into Pig Latin. This chapter provides you with the basics of Pig Latin, enough to write your first useful scripts. More advanced features of Pig Latin are covered in Chapter 6, Advanced Pig Latin.
Preliminary Matters
Relations and Fields
Pig Latin is a data flow language. Each
processing step results in a new data set, or
relation. In input = load 'data',
input is the name of the relation that results from loading
the data set data. A relation name is
referred to as an alias. Relation names look
like variables, but they are not. Once made, an assignment is
permanent. It is possible to re-use relation names; for example, this
is legitimate:
A = load 'NYSE_dividends' (exchange, symbol, date, dividends); A = filter A by dividends > 0; A = foreach A generate UPPER(symbol);
However, it is not recommended. It looks here
as if you are reassigning A. But really what you are doing is creating
new relations called A, losing track of the old relations called A.
Pig is smart enough to keep up. But it is still not a good practice.
It leads to confusion when trying to read your programs (which
A am I referring to?) and when reading error
messages.
In addition to relation names Pig Latin also has
field names. They name a field (or column) in a relation. In the
previous snippet of Pig Latin, dividends and
symbol are examples of field names. These are
somewhat like variables in that they will contain a different value for
each record as it passes through the pipeline. But you cannot assign
values to them.
Both relation and field names must start with an alphabetic character, and then can have zero or more alphabetic, numeric, or “_” (underscore) characters. All characters in the name must be ASCII.
Case Sensitivity
Unfortunately, Pig Latin cannot decide whether
it is case sensitive or not. Keywords in Pig Latin are not case
sensitive, LOAD is equivalent to load. But
relation and field names are. So A = load 'foo'; is not
equivalent to a = load 'foo';. UDF names are also case
sensitive, thus COUNT is not the same UDF as
count.
Comments
Pig Latin has two types of comment operators,
SQL style single line comments (--) and Java style
multi-line comments (/* */).
A = load 'foo'; --this is a single line comment /* * This is a multi-line comment. */ B = load /* a comment in the middle */'bar';
Input and Output
Before you can do anything of interest, you need to be able to add inputs and outputs to your data flows.
Load
The first step to any data flow is to specify
your input. In Pig Latin this is done with the
load statement. By default, load
looks for your data on HDFS in a tab delimited file using the default
load function PigStorage. divs = load
'/data/examples/NYSE_dividends'; will look for a file called
NYSE_dividends in the directory
/data/examples. You can also specify relative path
names. By default your Pig jobs will run in your home directory on
HDFS, /users/.
Unless you change directories, all relative paths will be evaluated from
there. You can also specify a full URL for the path, such as
yourloginhdfs://nn.acme.com/data/examples/NYSE_dividends to
read the file from the HDFS instance that has
nn.acme.com as a NameNode.
In practice most of your data will not be in tab
separated text files. You may also be loading data from storage systems
other than HDFS. Pig allows you to specify the function to use to load
your data with the using clause. For example, if you
wanted to load your data from HBase you would use the loader for
HBase:
divs = load 'NYSE_dividends' using HBaseStorage();
If you do not specify a load function, the built
in function PigStorage will be used. You can also pass
arguments to your load function via the using clause. For
example, if you are reading comma separated text data,
PigStorage takes an argument to indicate which character to
use as a separator.
divs = load 'NYSE_dividends' using PigStorage(',');The load statement also can have an
as clause. This allows you to specify the schema of the
data you are loading. The syntax and semantics of declaring schemas in
Pig Latin is discussed in the section called “Schemas”.
divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends);
When specifying a “file” to read
from HDFS, you can specify directories. In this case Pig will find all
files under the directory you specify and use them as input for that
load statement. So if you had a directory
input with two data files
today and yesterday under it,
and you specified input as your file to load, Pig
will read both today and
yesterday as input. If the directory you specify
has other directories, files in those directories will be included as
well.
The builtin Pig load functions that operate on
HDFS files, PigStorage and TextLoader,
support globs.[6] This allows you to read multiple files
that are not under the same directory, or to read some but
not all files under a directory. Table Table 5.1, “Globs in Hadoop 0.20” describes globs that are valid in Hadoop 0.20.
Be aware that glob meaning is determined by HDFS underneath Pig, so
which globs will work for you will depend on the version of HDFS you
have. Also, if you are issuing Pig Latin commands from a Unix shell
command line you will need to escape many of the glob characters to
prevent your shell from expanding them.
Table 5.1. Globs in Hadoop 0.20
| glob | comment |
|---|---|
| ? | Matches any single character. |
| * | Matches zero or more characters. |
| [abc] | Matches a single character from character set (a,b,c). |
| [a-z] | Matches a single character from the character range (a..z), inclusive. The first character must be lexicographically less than or equal to the second character. |
| [^abc] | Matches a single character that is not in the character set (a, b, c). The ^ character must occur immediately to the right of the opening bracket. |
| [^a-z] | Matches a single character that is not from the character range (a..z) inclusive. The ^ character must occur immediately to the right of the opening bracket. |
| \c | Removes (escapes) any special meaning of character c. |
| {ab,cd} | Matches a string from the string set {ab, cd} |
Store
After you have finished processing your data,
you will want to write it out somewhere. Pig provides the
store statement for this purpose. In many ways
it is the mirror image of the load statement. By default, Pig stores
your data on HDFS in a tab delimited file using
PigStorage[7].
store processed into '/data/examples/processed';
Pig will write the results of your processing
into a file processed in the directory
/data/examples. You can specify relative path
names. You can also specify a full URL for the path, such as
hdfs://nn.acme.com/data/examples/processed.
If you do
not specify a store function, PigStorage will be used. You
can to specify a different store function with a using
clause.
store processed into 'processed' using
HBaseStorage();You can also pass arguments to your store function. For example, if you want to store your data as comma separated text data, PigStorage takes an argument to indicat which character to use as a separator.
store processed into 'processed' using PigStorage(',');As noted in the section called “Running Pig”, when
writing to a file system processed will be a
directory with part files rather than a single file. But how many part
files will be created? That depends on the parallelism of the last job
before the store. If it has reduces, it will be determined
by the parallel level set for that job. See the section called “Parallel”
for information on how this is determined. If it is a map-only job then
it will be determined by the number of maps, which is controlled by
Hadoop and not Pig.
Dump
In most cases you will want to store your data somewhere when you are done processing it. But occasionally you will want to see it on the screen. This is particularly useful during debugging and prototyping sessions. It can also be useful for quick, ad hoc jobs. dump directs the output of your script to your screen.
dump processed;
Up through version 0.7, the output of
dump matches the format of constants in Pig Latin. So
longs are followed by an L, floats by an
F, and maps are surrounded by []
(brackets), tuples by () (parenthesis), and bags by
{} (braces). Starting with version 0.8 the
L for longs and F for floats were
dropped, though the markers for the complex types were kept. Nulls are
indicated by missing values. Fields are separated by commas. Since
each record in the output is a tuple, it is surrounded by
().
Relational Operations
Relational operators are the main tools Pig Latin provides to operate on your data. They allow you to transform it by sorting, grouping, joining, projecting, and filtering. This section covers the basic relational operators. More advanced features of these operators, as well as advanced relational operators, are covered in the section called “Advanced Relational Operations”. What is covered here will be enough to get you started programming in Pig Latin.
Foreach
foreach takes a set of
expressions and applies them to every record in the data pipeline; hence
the name foreach. From these expressions it generates new records to
send down the pipeline to the next operator. For those familiar with
database terminology, it is Pig's projection operator. For example, the
following code loads an entire record, but then removes all but the
user and id fields from each
record:
A = load 'input' as (user:chararray, id:long, address:chararray, phone:chararray,
preferences:map[]);
B = foreach A generate user, id;Expressions in Foreach
Foreach supports an array of expressions. The
simplest are constants and field references. The syntax for constants
has already been discussed in the section called “Types”. Field
references can be by name (as shown above) or by position. Positional
references are proceeded by a $ (dollar sign) and
start from 0.
prices = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
volume, adj_close);
gain = foreach prices generate close - open;
gain2 = foreach prices generate $6 - $3;Relations gain and
gain2 will contain the same values. Positional
style references are useful in situations where the schema is unknown
or undeclared.
In addition to using names and positions, you
can refer to all fields using “*” (asterisk). This
produces a tuple that contains all the fields. Beginning in version
0.9, you can also refer to ranges of fields using “..”
(two periods). This is particularly useful when you have many fields
and you do not want to repeat them all in your foreach
command.
prices = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); beginning = foreach prices generate ..open; -- produces exchange, symbol, date, open middle = foreach prices generate open..close; -- produces open, high, low, close end = foreach prices generate volume..; -- produces volume, adj_close
Standard arithmetic operators for integers and
floating point numbers are supported: + for
addition, - for subtraction, *
for multiplication, and / for division. These
operators return values of their own type, so 5/2
is 2 while 5.0/2.0 is
2.5. In addition, for integers the modulo operator
% is supported. The unary negative operator
- is also supported for both integers and floating
point numbers. Pig Latin obeys the standard mathematical precedence
rules. For information on what happens when arithmetic operators are
applied across different types (for example 5/2.0),
see the section called “Casts”.
2 == 2 ? 1 : 4 --returns 1
2 == 3 ? 1 : 4 --returns 4
null == 2 ? 1 : 4 -- returns null
2 == 2 ? 1 : 'fred' -- type error, both values must be of the same type
Null values are viral for all arithmetic
operators. That is, x + null = null for all values
of x.
Pig also provides a binary condition operator,
often referred to as bincond. It begins with a
Boolean test, followed by a ?, then the value to
return if the test is true, a :, and finally the
value to return if the test is false. If the test returns null, then
bincond returns null. Both value arguments of the bincond must return
the same type.
To extract data from complex types, use the
projection operators. For maps this is #, the
pound or hash, followed by the name of the key as a string. Keep in
mind that the value associated with a key may be of any type. If you
reference a key that does not exist in the map, the result is a
null.
bball = load 'baseball' as (name:chararray, team:chararray,
position:bag{t:(p:chararray)}, bat:map[]);
avg = foreach bball generate bat#'batting_average';Tuple projection is done with
., the dot operator. As with top level records,
the field can be referenced by name (if you have a schema for the
tuple) or by position. Referencing a non-existent positional
field in the tuple will return null. Referencing a field name that
does not exist in the tuple will produce an error.
A = load 'input' as (t:tuple(x:int, y:int)); B = foreach A generate t.x, t.$1;
Bag projection is not as straight forward as map and tuple projection. Bags do not guarantee that their tuples are stored in any order. So allowing a projection of the tuple inside the bag would not be meaningful. Instead, when you project fields in a bag you are creating a new bag with only those fields.
A = load 'input' as (b:bag{t:(x:int, y:int)});
B = foreach A generate b.x; This will produce a new bag whose tuples have
only the field x in them. You can project multiple
fields in a bag by surrounding the fields with parenthesis and
separating them by commas.
A = load 'input' as (b:bag{t:(x:int, y:int)});
B = foreach A generate b.(x, y);This seemingly pedantic distinction that
b.x is a bag and not a scalar value has
consequences. Consider the following Pig Latin, which will not
work:
A = load 'foo' as (x:chararray, y:int, z:int); B = group A by x; -- produces bag A containing all the records for a given value of x C = foreach B generate SUM(A.y + A.z);
It is clear what the programmer is trying to
do here. But since A.y and B.y
are bags and the addition operator is not defined on bags, this will
produce an error.[8] The correct way
to do the above calculation in Pig Latin is:
A = load 'foo' as (x:chararray, y:int, z:int); A1 = foreach A generate x, y + z as yz; B = group A1 by x; C = foreach B generate SUM(A1.yz);
UDFs in Foreach
User Defined Functions (UDFs) can be invoked in
foreach. These are called evaluation functions, or
eval funcs. Since they are part of a
foreach statement, these UDFs take one record at a time and
produce one output. Keep in mind that either the input or the output
can be a bag, so this one record may contain a bag of
records.
-- udf_in_foreach.pig divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends); --make sure all strings are upper case upped = foreach divs generate UPPER(symbol) as symbol, dividends; grpd = group upped by symbol; --output a bag upped for each value of symbol --take a bag of integers, produce one result for each group sums = foreach grpd generate group, SUM(upped.dividends);
In addition, eval funcs can take
* as an argument, which passes the entire record to
the function. They can also be invoked with no arguments at
all.
For a complete list of UDFs that are provided with Pig see Appendix A, Built In User Defined Functions and Piggybank. For a discussion of how to invoke UDFs not distributed as part of Pig, see the section called “User Defined Functions”.
Naming Fields in Foreach
The result of each foreach
statement is a new tuple, usually with a different schema then the
tuple that was an input to foreach. Pig can infer the
data types of the fields in this schema from the foreach
statement. But it cannot always infer the names of those fields. For
fields that are simple projections with no other operators applied,
Pig keeps the same name as before.
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
sym = foreach divs generate symbol;
describe sym;
sym: {symbol: chararray}Once any expression beyond simple projection
is applied, Pig does not assign a name to the field. If you do not
explicitly assign a name, the field will be nameless and only
addressable via a positional parameter, e.g. $0.
You can assign a name with the as clause.
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
in_cents = foreach divs generate dividends * 100.0 as dividend, dividends * 100.0;
describe in_cents;
in_cents: {dividend: double,double}Notice that in foreach the
as is attached to each expression. This is different
than load where it is attached to the entire load
statement. The reason for this will become clear when we discuss
flatten in the section called “Flatten”.
Filter
The filter statement
allows you to select which records will be retained in your data
pipeline. A filter contains a predicate. If that
predicate evaluates to true for a given record, then that record will be
passed down the pipeline. Otherwise, it will not.
Predicates can contain the equality operators
you expect, including == to test equality, and
!=, >,
>=, <, and
<=. These comparators can be used on any scalar
data type. == and != can be
applied to maps and tuples. To use these with two tuples, both tuples
must have the same schema or both not have a schema. None of the
equality operators can be applied to bags.
Pig Latin follows the operator precedence that
is standard in most programming languages where arithmetic operators
have precedence over equality operators. So x + y == a +
b is equivalent to (x + y) == (a +
b).
For chararrays, users can test to see if the chararray matches a regular expression.
-- filter_matches.pig
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
startswithcm = filter divs by symbol matches 'CM.*';Note
Pig uses Java's
regular expression format . This format requires the entire
chararray to match, not just a portion as in Perl style regular
expressions. For example, if you are looking for all fields that
contain the string “fred” you must say
'.*fred.*' and not
'fred'. The latter will only match the chararray
fred.
You can find chararrays that do not match a
regular expression by preceding the test with
not.
-- filter_not_matches.pig
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
notstartswithcm = filter divs by not symbol matches 'CM.*';You can combine multiple predicates into one
using the Boolean operators of and and
or. You can reverse the outcome of any predicate by
using the Boolean not operator. As is standard, the
precedence of Boolean operators, from highest to lowest is
not, and, or.
Thus a and b or not c is equivalent to (a
and b) or (not c).
Pig will short circuit Boolean operations when
possible. If the first (left) predicate of an and is
false, the second (right) will not be evaluated. So in 1 == 2
and udf(x) the UDF will never be invoked. Similarly, if the
first predicate of an or is true, the second
predicate will not be evaluted. 1 == 1 or udf(x)
will never invoke the UDF.
For Boolean operators, nulls follow the SQL
trinary logic. Thus x == null results in a value of
null not true (even if x
is null also) nor false. Filters only
pass through values that are true. So if there was a
field that had three values 2,
null, 4 and you applied a filter
x == 2 to it, only the first record where the value
is 2 would be passed through the filter. Likewise
x != 2 would only return the last record where the
value is 4. The way to look for null values is to
use the is null operator, which returns true whenever
the value is null. To find values that are not null
use is not null.
Likewise, null neither
matches nor fails to match any regular expression value.
Just as there are UDFs to be used in evaluation
expressions, there are UDFs specifically for filtering records. These
are called filter funcs. They are eval funcs
that return a Boolean value and can be invoked in the
filter statement. Filter funcs cannot be used in
foreach statements.
Group
The group statement
collects records with the same key together. While it is the first
operator we have looked at that shares its syntax with SQL, it is
important to understand that the grouping operator in Pig Latin is
fundamentally different than the one in SQL. In SQL the group
by clause creates a group that must feed directly into one or
more aggregate functions. In Pig Latin there is no direct connection
between group and aggregate functions. Instead,
group does exactly what it says. It collects all records
with the same value for the provided key together into a bag. You can
then pass this to an aggregate function if you want, or do other things
with it.
-- count.pig daily = load 'NYSE_daily' as (exchange, stock); grpd = group daily by stock; cnt = foreach grpd generate group, COUNT(daily);
This example groups records by the key
stock and then counts them. It is just as legitimate
to group them and then store them for processing at a later
time.
-- group.pig daily = load 'NYSE_daily' as (exchange, stock); grpd = group daily by stock; store grpd into 'by_group';
The records coming out of the group by statement
have two fields, the key and the bag of collected records. The key
field is named group[9]. The bag is
named for the alias that was grouped. So in the previous examples it
will be named daily. It will have the same schema as
the relation daily. If the relation
daily has no schema then the bag
daily will have no schema. For each record in the
group, the entire record (including the key) is in the bag. Changing
the last line of the above script from store grpd... to
describe grpd; will produce:
grpd: {group: bytearray,daily: {exchange: bytearray,stock: bytearray}}
You can also group on multiple keys. The
keys must be surrounded by parenthesis. The resulting records still have
two fields. In this case the group field is a tuple
with a field for each key.
--twokey.pig
daily = load 'NYSE_daily' as (exchange, stock, date, dividends);
grpd = group daily by (exchange, stock);
avg = foreach grpd generate group, AVG(daily.dividends);
describe grpd;
grpd: {group: (exchange: bytearray,stock: bytearray),daily: {exchange: bytearray,
stock: bytearray,date: bytearray,dividends: bytearray}}You can also group all of the records in your
pipeline together using all.
--countall.pig daily = load 'NYSE_daily' as (exchange, stock); grpd = group daily all; cnt = foreach grpd generate COUNT(daily);
The record coming out of group all
has the chararray literal “all” as a key. Usually this
does not matter because you will pass the bag directly to an aggregate
function like COUNT. But if you plan to store the
record or use it for another purpose you may want to project out the
artificial key first.
group is the first operator we have
looked at that will, usually, force a reduce phase. Grouping means
collecting all records where the key has the same value. If the
pipeline was in a map phase this will force it to shuffle and then
reduce. If the pipeline was already in a reduce it will force it to
pass through map, shuffle, and reduce phases.
Since grouping collects all
records together with the same value for the key, you often get skewed
results. That is, just because you have specified that your job have
100 reducers, there is no reason to expect that the number of values per
key will be evenly distributed. They may have a Gaussian or power law
distribution.[10] Consider for example if you have an
index of web pages and you group by the base url. Certain values such
as yahoo.com are going to have far more values than
most. This will mean that some reducers get far more data than others.
Since your MapReduce job is not finished (and any subsequent ones cannot
start) until all your reducers have finished, this skew will
significantly slow your processing. In some cases is will also be
impossible for one reducer to manage that much data.
Pig has a number of ways that it tries to manage this skew to balance out the load across your reducers. The one that applies to grouping is Hadoop's combiner. For details of how Hadoop's combiner works see the section called “Combiner Phase”. This does not remove all skew, but it places a bound on it. And since, in most jobs, the number of mappers will be at most in the tens of thousands, even if the reducers get a skewed number of records, the absolute number of records per reducer will be small enough that the reducers can handle them quickly.
Unfortunately, not all calculations can be done using the combiner. Calculations like sum that can be decomposed into any number of steps are called distributive. These fit nicely into the combiner. Calculations that can be decomposed into an initial step, any number of intermediate steps, and a final step are referred to as algebraic. Count is an example of such a function, where the initial step is a count and the intermediate and final steps are sums. Distributive is a special case of algebraic, where the initial, intermediate, and final steps are all the same. Session analysis, where you want to track a user's actions on a website, is an example of a calculation that is not algebraic. You must have all the records sorted by timestamp before you can start analyzing their interaction with the site.
Pig's operators and built in UDFs use the combiner
whenever possible, because of its skew reducing features and because
early aggregation greatly reduces the amount of data shipped over the
network and written to disk, thus speeding performance significantly.
UDFs can indicate when they can work with the combiner by implementing
the Algebraic interface. For information on how
to make your UDFs use the combiner, see the section called “Algebraic Interface”.
For information on how to determine how parallel
to execute your group operation, see the section called “Parallel”. Also, keep in mind that when using group
all, you are necessarily serializing your pipeline. That is,
this step, and any step after it until you split out the single bag now
containing all of your records, will not be done in parallel.
Finally, group handles nulls in the
same way that SQL handles them, by collecting all records with a null
key into the same group. Note that this is in direct contradiction to
the way expressions handle nulls (remember that neither null ==
null nor null != null are true) and to
the way the section called “Join” handles nulls.
Order by
The order statement sorts
your data for you. It produces a total order of your output data.
Total order means that not only is the data sorted in each partition of
your data, it is also guaranteed that all records in partition
n are less than all records in partition n -
1 for all n. When your data is stored on
HDFS, where each partition is a part file, this means that
cat will output your data in order.
The syntax of order is similar to
group. You indicate a key or set of keys by which you wish
to order your data. One glaring difference is that there are no
parenthesis around the keys when multiple keys are indicated in
order.
--order.pig
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float, close:float,
volume:int, adj_close:float);
bydate = order daily by date;
--order2key.pig
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float,
close:float, volume:int, adj_close:float);
bydatensymbol = order daily by date, symbol;It is also possible to reverse the order of the
sort by appending desc to a key in the sort. In
order statements with multiple keys desc
applies only to the key it immediately follows. Other keys will still
be sorted in ascending order.
--orderdesc.pig
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float, close:float,
volume:int, adj_close:float);
byclose = order daily by close desc, open;
dump byclose; -- open still sorted in ascending orderData is sorted based on the types of the
indicated fields; numeric values are sorted numerically, and chararray
fields are sorted lexically. Byte array fields are sorted lexically,
using byte values rather than character values. Sorting by maps,
tuples, or bags produces errors. For all data types, nulls are taken to
be smaller than all possible values for that type, and thus will always
appear first (or last if desc is used).
As discussed above in the section called “Group”, skew of the values in data is very common. This
affects order just as it does group, causing
some reducers to take significantly longer than others. To address
this, Pig balances the output across reducers. It does this by first
sampling the input of the order statement to get an
estimate of the key distribution. Based on this sample it then builds a
partitioner that produces a balanced total order (for details on what a
partitioner is see the section called “Shuffle Phase”). For
example, suppose you are ordering on a chararray field with the values
a, b, e, e, e, e, e, e, m, q, r, z and you have three
reducers. The partitioner in this case would decide to partition your
data such that values a - e goes to reducer 1,
e goes to reducer 2, and m-z goes
to reducer 3. Notice that the value e can be sent to
either reducer 1 or 2. Some records with key e will
be sent to reducer 1 and some to 2. This allows the partitioner to
evenly distribute the data. In practice we rarely see variance in
reducer time exceed 10% using this algorithm.
An important side effect of the way Pig
distributes records to minimize skew is that it breaks the MapReduce
convention that all instances of a given key are sent to the same
partition. If you have other processing that depends on this convention
do not use Pig's order statement to sort data for
it.
order always causes your data
pipeline to go through a reduce phase. This is necessary to collect all
equal records together. Also, Pig adds an additional MapReduce job to
your pipeline to do the sampling. Since this sampling is very light
weight (it reads only the first record of every block) it generally
takes less then 5% of the total job time.
Distinct
The distinct statement is very simple. It removes duplicate records. It only works on entire records, not on individual fields.
--distinct.pig -- find a distinct list of ticker symbols for each exchange -- This load will truncate the records, picking up just the first two fields. daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray); uniq = distinct daily;
Since it needs to collect like records together in order to determine if they are duplicates, distinct forces a reduce phase. It does make use of the combiner to remove any duplicate records it can in the map phase.
The use of distinct shown here is
equivalent to select distinct x in SQL. To learn how to do
the equivalent of select count(distinct x) see the section called “Nested Foreach”.
Join
join is one of the
workhorses of data processing, and likely to be in many of your Pig
Latin scripts. join selects records from one input to put
together with records from another input. This is done by indicating
keys for each input. When those keys are
equal,[11] the
two rows are joined. Records for which no match is found are
dropped.
--join.pig
daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
volume, adj_close);
divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends);
jnd = join daily by symbol, divs by symbol;
You can also join on multiple keys. In all cases you must have the same number of keys, and they must be of the same or compatible types (where compatible means that an implicit cast can be inserted, see the section called “Casts”).
-- join2key.pig
daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
volume, adj_close);
divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends);
jnd = join daily by (symbol, date), divs by (symbol, date);Like foreach, join
preserves the names of the fields of the inputs passed to it. It also
prepends the name of the relation the field came from, followed by a
::. Adding describe jnd; to the end of
the previous example produces:
jnd: {daily::exchange: bytearray,daily::symbol: bytearray,daily::date: bytearray,
daily::open: bytearray,daily::high: bytearray,daily::low: bytearray,
daily::close: bytearray,daily::volume: bytearray,daily::adj_close: bytearray,
divs::exchange: bytearray,divs::symbol: bytearray,divs::date: bytearray,
divs::dividends: bytearray}
The daily:: prefix does not
need to be used unless the field name is no longer unique in the record.
In this example, you will need to use daily::date or
divs::date if you wish to refer to one of the
date fields after the join. But fields such as
open and divs you do not, because
there is no ambiguity.
Pig also supports outer
joins. In outer joins records that do not have a match on
the other side are included, with null values being filled in for the
missing fields. Outer joins can be left,
right, or full. A left outer join means
records from the left side will be included even if they do not have a
match on the right side. Likewise a right outer joins means records
from the right side will be included even if they do not have a match on
the left side. A full outer join means records from both sides are
taken even when they do not have matches.
--leftjoin.pig
daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
volume, adj_close);
divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends);
jnd = join daily by (symbol, date) left outer, divs by (symbol, date);outer is a noise word, and can be
omitted. Unlike some SQL implementations, full is not a
noise word. C = join A by x outer, B by u; will generate a
syntax error, not a full outer join.
Outer joins are only supported when Pig knows the schema of the data on the side(s) it may need to fill in nulls for. Thus for left outer joins it must know the schema of the right side, for right outer joins it must know the schema of the left side, and for full outer joins it must know both. This is because, without the schema, Pig will not know how many null values to fill in.[12]
As in SQL, null values for keys do not match anything, even null values from the other input. So, for inner joins, all records with null key values are dropped. For outer joins they will be retained, but will not match any records from the other input.
Pig can also do multiple joins in a single operation, as long as they are all being joined on the same key(s). This can only be done for inner joins.
A = load 'input1' as (x, y); B = load 'input2' as (u, v); C = load 'input3' as (e, f); alpha = join A by x, B by u, C by e;
Self joins are supported, though the data must be loaded twice.
--selfjoin.pig
-- For each stock, find all dividends that increased between two dates
divs1 = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends);
divs2 = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends);
jnd = join divs1 by symbol, divs2 by symbol;
increased = filter jnd by divs1::date < divs2::date and
divs1::dividends < divs2::dividends;If the above were changed to the following, it would fail:
--selfjoin.pig
-- For each stock, find all dividends that increased between two dates
divs1 = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends);
jnd = join divs1 by symbol, divs1 by symbol;
increased = filter jnd by divs1::date < divs2::date and
divs1::dividends < divs2::dividends;It seems like this ought to work, since Pig could
split the divs1 data set and send it to join twice.
But the problem is that field names would be ambiguous after the join.
So the load statement must be written twice. The next best thing would
be for Pig to figure out that these two load statements are
loading the same input and to only run the load once. But it does not do
that currently.
Pig does these joins in MapReduce by using the
map phase to annotate each record with which input it came from. It
then uses the join key as the shuffle key. Thus join
forces a new reduce phase. Once all of the records with the same value
for the key are collected together, Pig does a cross product between the
records from both inputs. To minimize memory usage it has MapReduce
order the records coming into the reducer using the input annotation it
added in the map phase. Thus all of the records for the left input
arrive first. Pig caches these in memory. All of the records for the
right input arrive second. As each of these records arrives it is
crossed with each record from the left side to produce an output record.
In a multi-way join the left n - 1 inputs are held in
memory, and the nth is streamed through. It is
important to keep this in mind when writing joins in your Pig queries if
you know that one of your inputs has more records per value of the key.
Placing that input on the right side of your join will lower memory
usage and possibly increase your script's performance.
Limit
Sometimes you only want to see a limited number
of results. limit allows you do this.
--limit.pig divs = load 'NYSE_dividends'; first10 = limit divs 10;
The above example will return at most ten lines
(if your input has less than ten lines total then it will return them
all). Note that for all operators except order, Pig does
not guarantee the order in which records are produced. Thus, since
NYSE_dividends has more than ten records, the above
script could return different results every time. Putting an
order immediately before the
limit will guarantee that the same results are returned
every time.
Limit causes an additional reduce phase, since
it needs to collect the records together to count how many it is
returning. It does optimize this phase by limiting the output of each
map, and then applying the limit again in the reducer. In the case
where limit is combined with order, the two
are done together on the map and reduce. That is, on the map side the
records are sorted by MapReduce and the limit applied in the combiner.
They are sorted again by MapReduce as part of the shuffle, and Pig
applies the limit again in the reducer.
One optimization Pig does not do that it could,
is terminate reading of the input early once it has reached the number
of records specified by limit. So in the example above, if
you hoped to use this to read just a tiny slice of your input, you will
be disappointed. Pig will still read it all.
Sample
sample offers a simple
way to get a sample of your data. It reads through all of your data,
but only returns a percentage of rows. What percentage it returns is
expressed as a double value, between 0 and 1. So, in the following
example, 0.1 indicates 10%.
--sample.pig divs = load 'NYSE_dividends'; some = sample divs 0.1;
Currently the sampling algorithm is very simple.
The sample A by 0.1 is rewritten to filter A by
random() <= 0.1. Obviously this is non-deterministic, so
results of a script with sample will vary every run. Also,
the percentage will not be an exact match, but close. There has been
discussion about adding more sophisticated sampling techniques, but it
has not been done yet.
Parallel
One of Pig's core claims is that it provides a language for parallel data processing. One of the tenets of Pig's philosophy is that Pigs are domestic animals (see the section called “Pig Philosophy”), so Pig prefers to let you tell it how parallel to be. To do this it provides the parallel clause.
The parallel clause can be attached
to any relational operator in Pig Latin. However, it only controls
reduce side parallelism. So it only makes sense for operators that
force a reduce phase. These are:
group*, order,
distinct, join*,
limit, cogroup*,
and cross. Operators with an *
have multiple implementations, some of which force a reduce and some
which do not. For details on this and on operators not covered in this
chapter, see Chapter 6, Advanced Pig Latin.
parallel is ignored in local mode, since all operations
happen serially in local mode.
--parallel.pig
daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
volume, adj_close);
bysymbl = group daily by symbol parallel 10;In this example, parallel will
cause the MapReduce job spawned by Pig to have ten reducers.
parallel clauses apply only to the statement they are
attached to. They do not carry through the script. So if this
group were followed by an order,
parallel would need to be set for that order
separately. Most likely the group will reduce your data
size significantly and you will want to change the parallelism.
--parallel.pig
daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
volume, adj_close);
bysymbl = group daily by symbol parallel 10;
average = foreach bysymbl generate group, AVG(daily.close) as avg;
sorted = order average by avg desc parallel 2;If, however, you do not want to set parallel
separately for every reduce-invoking operator in your script, you can
set a script-wide value using the set command.
--defaultparallel.pig
set default_parallel 10;
daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
volume, adj_close);
bysymbl = group daily by symbol;
average = foreach bysymbl generate group, AVG(daily.close) as avg;
sorted = order average by avg desc;In the above script all MapReduce jobs will be
done with ten reduces. When you set a default parallel level, you can
still add a parallel clause to any statement. It will
override the default value. Thus it can be helpful to set a default
value as a base to use in most cases, and only specifically add a
parallel clause when you have an operator that needs a
different value.
All of this is rather static however. What happens if you run the same script across different inputs which have different characteristics? Or if your input data varies significantly sometimes? You do not want to have to edit your script each time. Using parameter substitution you can write your parallel clauses with variables, providing values for those variables at run time. See the section called “Parameter Substitution” for details.
So far we have been assuming that you know what your parallel value should be. See the section called “Select the Right Level of Parallelism” for information on how to determine that.
Finally, what happens if you do not specify a parallel level? Before version 0.8, Pig lets MapReduce set the parallelism in that case. The MapReduce default parallelism is controlled by your cluster configuration. The installation default value is one, and most people do not change that. Which means most likely you will be running with only one reducer. This is rarely what you want.
To avoid this situation, Pig added a heuristic in 0.8 to do a gross estimate of what parallelism should be set to if it is not set. It looks at the initial input size, assumes there will be no data size changes, and then allocates a reducer for every 1G of data. It must be emphasized that this is not a good algorithm. It is provided only to prevent mistakes that result in scripts running very slowly, and in some extreme cases causing MapReduce itself to have problems. This is a safety net, not an optimizer.
User Defined Functions
Much of the power of Pig lies in its ability to allow users to combine the operators provided by Pig with their own or others' code via UDFs. Up through version 0.7, all UDFs must be written in Java. They are implemented as Java classes.[13] This makes it very easy to add new UDFs to Pig by writing a Java class and telling Pig about your Jar file.
As of version 0.8 UDFs can also be written in Python. Pig uses Jython to execute Python UDFs, so they must be compatible with Python 2.5 and not use Python 3 features.
Pig itself comes packaged with some UDFs. Prior to version 0.8 this was a very limited set, the standard SQL aggregate functions and a few others. In 0.8 a large number of standard string processing, math, and complex type UDFs were added. For a complete list and description of built in UDFs, see the section called “Built In UDFs”.
Piggybank is a collection of user contributed UDFs that is packaged and released along with Pig. Piggybank UDFs are not included in the Pig jar, and thus you have to register them manually in your script. See the section called “Piggybank” for more information.
Of course you can also write your own UDFs, or use those written by other users. For details of how to write your own, see Chapter 10, Writing Evaluation and Filter Functions. Finally, you can use some static Java functions as UDFs as well.
Registering UDFs
When you use a UDF that is not already built
into Pig, you have to tell Pig where to look for that UDF. This is done
via the register command. For example, let's say
you want to use the Reverse UDF provided in
Piggybank. For information on where to find the Piggybank jar see
the section called “Piggybank”.
--register.pig
register 'your_path_to_piggybank/piggybank.jar';
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
backwards = foreach divs generate
org.apache.pig.piggybank.evaluation.string.Reverse(symbol);This example tells Pig that it needs to include
code from
when it produces a jar to send to Hadoop. Pig opens all of the
registered jars, takes out the files, and places them in the jar that it
sends to Hadoop to run your jobs.your_path_to_piggybank/piggybank.jar
In this example, we have to give Pig the full package and class name of the UDF. This verbosity can be alleviated in two ways. The first option is to use the define command, see the section called “Define and UDFs” below. The second option is to include a set of paths on the command line for Pig to search when looking for UDFs. So if instead of invoking Pig as pig register.pig we change our invocation to pig -Dudf.import.list=org.apache.pig.piggybank.evaluation.string register.pig then we can change our script to:
register 'your_path_to_piggybank/piggybank.jar';
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
backwards = foreach divs generate Reverse(symbol);Using yet another property, we can get rid of the register command as well. If we add -Dpig.additional.jars=/usr/local/pig/piggybank/piggybank.jar to our command line, then the register command is no longer necessary.
In many cases it is better to explicitly deal
with registration and definition issues in the script itself via the
register and define commands than using these
properties. Otherwise everyone who runs your script has to know how to
configure their command line. However, in some situations your scripts
may always use the same set of jars and always look in the same places
for them. For instance, you may have a set of jars used by everyone in
your company. In this case, placing these properties in a shared
properties file and using that with your Pig scripts will make sharing
those UDFs easier and assure that everyone is using the correct versions
of them.
In 0.8 and later versions the register command
can also take HDFS paths. If your jars are stored in HDFS you could
then say register 'hdfs://user/jar/acme.jar';. Starting in
0.9, register accepts globs. So if all of the jars you need are stored
in one directory you could include them all by doing register
'/usr/local/share/pig/udfs/*.jar'.
Registering Python UDFs
register is also used to locate
resources for Python UDFs that you use in your Pig Latin scripts. In
this case you do not register a jar, but rather a Python script that
contains your UDF. The Python script must be in your current directory.
Using the examples provided in the example code, and copying
udfs/python/production.py to the
data directory looks like this:
--batting_production.pig
register 'production.py' using jython as bballudfs;
players = load 'baseball' as (name:chararray, team:chararray,
pos:bag{t:(p:chararray)}, bat:map[]);
nonnull = filter players by bat#'slugging_percentage' is not null and
bat#'on_base_percentage' is not null;
calcprod = foreach nonnull generate name, bballudfs.production(
(float)bat#'slugging_percentage',
(float)bat#'on_base_percentage');The important differences here are the
using jython and as bballudfs portions of
the register statement. using jython tells Pig this UDF
is written in Python, not Java, and it should use Jython to compile
that UDF. Pig does not know where on your system the Jython
interpreter is, so you must include jython.jar in
your classpath when invoking Pig. This can be done by setting the
PIG_CLASSPATH environment variable.
as bballudfs defines a namespace
that UDFs from this file are placed in. All UDFs from this file must
now be invoked as
bballudfs.. Each
Python file you load should be given a separate namespace. This
avoids naming collisions when you register two Python scripts with
duplicate function names.udfname
A caveat, Pig does not trace dependencies
inside your Python scripts and send the needed Python modules to your
Hadoop cluster. You are required to make sure the modules you need
reside on the task nodes in your cluster and that the
PYTHONPATH environment variable is set on
those nodes such that your UDFs will be able to find them for
import. This issue has been fixed after 0.9, but as of this writing
not yet released.
Define and UDFs
As was alluded to earlier,
define can be used to provide an alias so that
you do not have to use full package names for your Java UDFs. It can
also be used to provide constructor arguments to your UDFs.
define also is used in defining streaming commands, but
this section only covers its UDF-related features. For information on
using define with streaming, see the section called “Stream”. The following provides an example of using
define to provide an alias for
org.apache.pig.piggybank.evaluation.string.Reverse.
--define.pig
register 'your_path_to_piggybank/piggybank.jar';
define reverse org.apache.pig.piggybank.evaluation.string.Reverse();
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
backwards = foreach divs generate reverse(symbol);Eval and filter functions can also take one or
more strings as constructor arguments. If you are using a UDF that
takes constructor arguments, define is the place to provide
those arguments. For example, consider a method
CurrencyConverter that takes two constructor
arguments, the first indicating which currency you are converting from,
and the second which currency you are converting to.
--define_constructor_args.pig
register 'acme.jar';
define convert com.acme.financial.CurrencyConverter('dollar', 'euro');
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
backwards = foreach divs generate convert(dividends);Calling Static Java Functions
Java has a rich collection of utilities and libraries. Since Pig is implemented in Java, some of these functions can be exposed to Pig users. Starting in version 0.8, Pig offers invoker methods that allow you to treat certain static Java functions as if they were Pig UDFs.
Any public static Java function that takes no
arguments, or some combination of int, long,
float, double, String, or arrays
thereof[14] and returns int,
long, float, double, or
String can be invoked in this way.
Since Pig Latin does not support overloading on
return types there is an invoker for each return type:
InvokeForInt,
InvokeForLong,
InvokeForFloat,
InvokeForDouble, and
InvokeForString. You must pick the appropriate
invoker for the type you wish to return. This method takes two
constructor arguments. The first is the full package, classname, and
method name. The second is a space-separated list of parameters the
Java function expects. Only the types of the parameters are given. If
the parameter is an array then [] (square
brackets) are appended to the type name. If the method takes no
parameters, the second constructor argument is omitted.
For example, if you wanted to use Java's
Integer class to translate decimal values to
hexidecimal values, you could do:
--invoker.pig
define hex InvokeForString('java.lang.Integer.toHexString', 'int');
divs = load 'NYSE_daily' as (exchange, symbol, date, open, high, low,
close, volume, adj_close);
nonnull = filter divs by volume is not null;
inhex = foreach nonnull generate symbol, hex((int)volume);If your method takes an array of types, Pig will expect to pass it a
bag where each tuple has a single field of that type. So if you had a
Java method com.yourcompany.Stats.stdev that took
an array of doubles you could use it like this:
define stdev InvokeForDouble('com.acme.Stats.stdev', 'double[]');
A = load 'input' as (id: int, dp:double);
B = group A by id;
C = foreach B generate group, stdev(A.dp);Warning
Invoker does not use the
Accumulator or Algebraic
interfaces, and is thus likely to be much slower and to use much more
memory than UDFs written specifically for Pig. This means that before you
pass an array argument to an invoked method, you should think carefully
about whether those inefficiencies are acceptable or not. For more
information on these interfaces see the section called “Accumulator Interface” and the section called “Algebraic Interface”.
Invoking Java functions in this way does have a small cost since reflection is used to find and invoke the methods.
Invoker functions throw Java
IllegalArgumentExceptions when they are passed
null input. You should place a filter before the invocation to prevent
this.
[6] Any loader that uses
FileInputFormat as its InputFormat will support
globs. Most loaders that load data from HDFS use this
InputFormat.
[7] A single function can be both a load and store function, as PigStorage is.
[8] You may object that Pig could figure
out what is intended here and do it, since SUM(A.y +
A.z) could be decomposed to “foreach record in
A add y and z and then take the sum”. This is true. But when
we change the group to a cogroup so that there are two bags
A and B involved (see the section called “Cogroup”) and change the sum to SUM(A.y +
B.z) then, since neither A nor B guarantee any
ordering, this is not a well defined operation. In designing
the language we thought it better to be consistent and always
say that bags could not be added rather than allow it
sometimes and not others.
[9] Thus the keyword
group is overloaded in Pig Latin. This is unfortunate and
confusing, but also hard to change now.
[10] In my experience the vast majority of data tracking human activity follows a power law distribution.
[11] Actually, joins can be on any condition, not just equality. Pig only supports joins on equality (called equi-joins). See the section called “Cross” for information on how to do non-equi-joins in Pig.
[12] You may object that Pig could determine this by looking at other records in the join and inferring the correct number of fields. However, this does not work for two reasons. First, when no schema is present Pig does not enforce a semantic that every record has the same schema. So assuming Pig can infer one record from another is not valid. Second, there may be no records in the join that match, thus Pig may have no record to infer from.
[13] This is why UDF names are case sensitive in Pig.
[14] For int, long, float, and double, invoker can call
Java functions that take the scalar types, not the
associated Java classes; so int but not
Integer, etc.





Add a comment



Add a comment