Chapter 6. Advanced Pig Latin
In the last chapter we worked through the basics of Pig Latin. In this chapter we will plumb its depths, discussing the more advanced operators and as well as additional options for operators that were previously introduced. We will also discuss how Pig handles more complex data flows. Finally we will look at how to use macros and modules to modularize your scripts.
Advanced Relational Operations
Advanced Features of Foreach
In our introduction to foreach, see
the section called “Foreach”, we discussed how it could take a list
of expressions to output for every record in your data pipeline. Now we
will look at ways it can explode the number of records in your pipeline,
and also how it can be used to apply a set of operations to each
record.
Flatten
Sometimes you have data in a bag or a tuple,
and you want to remove that level of nesting. The
baseball data available on github (see the section called “Code Examples in this Book”) can be used as an example. Since a
player can play more than one position, position is stored in a bag.
This allows us to still have one entry per player in the
baseball file.[15] But when you want to switch around your data
on the fly and group by a particular position, you need a way to pull
those entries out of the bag. To do this, Pig provides the
flatten modifier in
foreach.
--flatten.pig
players = load 'baseball' as (name:chararray, team:chararray,
position:bag{t:(p:chararray)}, bat:map[]);
pos = foreach players generate name, flatten(position) as position;
bypos = group pos by position;A foreach with a
flatten produces a cross product of every record in the
bag with all of the other expressions in the generate
statement. Looking at the first record in
baseball we see it is (replacing tabs with commas
for clarity):
Jorge Posada,New York Yankees,{(Catcher),(Designated_hitter)},...
Once this has passed through the
flatten statement it will be two records:
Jorge Posada,Catcher
Jorge Posada,Designated_hitter
If there is more than one bag and both are
flattened, then this cross product will be done with members of each
bag as well as other expressions in the generate. So
rather than getting n rows (where
n is the number of records in one bag) you will
get n * m rows.
One side effect that surprises many users is
that if the bag is empty, then no records are produced. So if there
had been an entry in baseball with no position,
either because the bag is null or empty, then that record would not be
contained in the output of flatten.pig. The
record with the empty bag would be swallowed by foreach.
There are a couple of reasons for this behavior. One, since Pig may
or may not have the schema of the data in the bag, it may have no idea
how to fill in nulls for the missing fields. Two, from a mathematical
perspective, this is what you would expect. Crossing a set
S with the empty set results in the empty set. If
you wish to avoid this you use a bincond to replace empty bags
with a constant bag.
--flatten_noempty.pig
players = load 'baseball' as (name:chararray, team:chararray,
position:bag{t:(p:chararray)}, bat:map[]);
noempty = foreach players generate name,
((position is null or IsEmpty(position)) ? {('unknown')} : position)
as position;
pos = foreach noempty generate name, flatten(position) as position;
bypos = group pos by position;flatten can also be applied to a
tuple. In this case it does not produce a cross product. Instead it
elevates each field in the tuple to a top level field. Again, empty
tuples will remove the entire record.
If the fields in a bag or tuple that is being
flattened have names, Pig will carry those names along. As with join,
to avoid ambiguity the field name will have the bag's name and
:: prepended to it. As long as the field name is
not ambiguous you are not required to use the
bagname::.
If you wish to change the names of the fields,
or if the fields did not initially have names, you can attach an
as clause to your flatten as in the example above. If
there is more than one field in the bag or tuple that you are
assigning names to, you must surround the set of field names with
parenthesis.
Finally, if you flatten a bag or tuple without
a schema, and you do not provide an as clause, the
resulting records coming out of your foreach will have a
null schema. This is because Pig will not know how many fields the
flatten will result in.[16]
Nested Foreach
So far all of the examples of
foreach that we have seen immediately generate one or
more lines of output. But foreach is more powerful than
this. It can also apply a set of relational operations to each record
in your pipeline. This is referred to as a nested
foreach or inner foreach. One example of how this can be
used is to find the number of unique entries in a group. For example,
to find the number of unique stock symbols for each exchange in the
NYSE_daily data:
--distinct_symbols.pig
daily = load 'NYSE_daily' as (exchange, symbol); -- not interested in other fields
grpd = group daily by exchange;
uniqcnt = foreach grpd {
sym = daily.symbol;
uniq_sym = distinct sym;
generate group, COUNT(uniq_sym);
};There are several new things here to unpack;
we will walk through each. In this example rather than
generate immediately following foreach,
instead a { (open brace) signals
that we will be nesting operators inside this foreach. In
this nested code, each record passed to foreach is handled
one at a time.
In the first line we see a syntax that we have
not seen outside of foreach. In fact sym =
daily.symbol would not be legal outside. It is roughly
equivalent to a top level statement sym = foreach grpd generate
daily.symbol. It is not stated that way inside because it is
not really another foreach. There is no relation for it
to be associated with (that is, grpd is not defined
here). This line takes the bag daily, and
produces a new relation sym which is a bag with
tuples that have only the field symbol.
The second line applies the distinct operator
to the relation sym. Note that even inside
foreach relational operators can only be applied to
relations. They cannot be applied to expressions. The statement
uniq_sym = distinct daily.symbol will produce a syntax
error. daily.symbol is an expression, not a
relation. sym is a relation. This distinction may
seem arbitrary, but it results in Pig Latin having a
coherent definition as a language. Without this, strange statements
like C = distinct 1 + 2 would be legal. One way to think
about this is that the assignment operator inside foreach
can be used to take an expression and create a relation, as happens in
this example.
The last line in a nested foreach
must always be generate. This tells Pig how to take the
results of the nested operations and produce a record to be put in the
outer relation (in this case, uniqcnt). So
generate is the operator that takes the inner relations
and turns them back into expressions for inclusion in the outer
relation. That is, if the script read generate group,
uniq_sym; uniq_sym would be treated as a bag
for the purpose of the generate statement.
Theoretically any Pig Latin relational
operator should be legal inside foreach. However, at
the moment, only distinct, filter,
limit, and order are supported.
Let's take a look at a few more examples of how this feature can be useful. It can be used to sort the contents of a bag before the bag is passed to a UDF. This is convenient for UDFs that require all of their input to come in a certain order. Consider a stock analysis UDF that wants to track information about a particular stock over time. The UDF will want input sorted by timestamp.
--analyze_stock.pig
register 'acme.jar';
define analyze com.acme.financial.AnalyzeStock();
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float,
close:float, volume:int, adj_close:float);
grpd = group daily by symbol;
analyzed = foreach grpd {
sorted = order daily by date;
generate group, analyze(sorted);
};Doing the sorting in Pig Latin, rather than in
your UDF, is important for a couple of reasons. One, it means Pig can
offload the sorting to MapReduce. MapReduce has the ability to sort
data by a secondary key while grouping it. So the order
statement in this case does not require a separate sorting operation.
Two, it means that your UDF does not need to wait for all data to be
available before it starts processing. Instead, it can use the
Accumulator interface (see the section called “Accumulator Interface”) which is much more memory
efficient.
This feature can be used to find the top k elements in a group. The following example will find the top three dividends payed for each stock.
--hightest_dividend.pig
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
grpd = group divs by symbol;
top3 = foreach grpd {
sorted = order divs by dividends desc;
top = limit sorted 3;
generate group, flatten(top);
};Currently these nested portions of code are
always run serially for each record handed to them. Of course the
foreach itself will be running in multiple map or reduce
tasks. But each instance of the foreach will not spawn
sub-tasks to do the nested operations in parallel. So if we added a
parallel 10 clause to the grpd = group divs by
symbol statement in the previous example, this ordering and
limiting would be taking place in ten reducers. But each group of
stocks would be sorted and the top three records taken serially within
one of those ten reducers.
There is, of course, no requirement that the
pipeline inside the foreach be a simple linear pipeline.
For example, if you wanted to calculate two distinct counts together
you could do:
--double_distinct.pig
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray);
grpd = group divs all;
uniq = foreach grpd {
exchanges = divs.exchange;
uniq_exchanges = distinct exchanges;
symbols = divs.symbol;
uniq_symbols = distinct symbols;
generate COUNT(uniq_exchanges), COUNT(uniq_symbols);
};For simplicity, Pig actually runs the above
pipeline once for each expression in generate. In the
above example this has no side effects, because the two data flows are
completely disjoint. But if you constructed a pipeline where there
was a split in the flow and put a UDF in the shared portion, you would
find that it was invoked more often than you expected.
Using Different Join Implementations
When we covered join in the
previous chapter (see the section called “Join”) we discussed the
default join behavior. Pig offers multiple join implementations. We
will discuss the other implementations here.
Traditionally in RDBMS systems, the SQL
optimizer chooses a join implementation for the user. This is nice, as
long as the optimizer chooses well, which it does in most cases. But
Pig has taken a different approach. In the Pig team we like to say that
our optimizer is located between the user's chair and keyboard. We
empower the user to make these choices rather than having Pig make them.
So for operators such as join where there are multiple
implementations, Pig lets the user indicate which to use via a
using clause.
This approach fits well with our philosophy that Pig's are domestic animals (i.e. Pig does what you tell it, see the section called “Pig Philosophy”). Also, as a relatively new product Pig has a lot of functionality to add. It makes more sense to focus on adding implementation choices and letting the user choose which ones to use rather than focussing on building an optimizer capable of choosing well.
Joining Small to Large Data
A common type of join is doing a lookup in a smaller input. Consider, for example, if you were processing data where you needed to translate a U.S. zip code (postal code) to the state and city it referred to. As there are at most 100,000 zip codes in the U.S., this translation table should easily fit in memory. Rather than forcing a reduce phase that will sort your big file plus this tiny zip code translation file, it makes sense instead to send the zip code file to every machine, load it into memory, and then do the join by streaming through the large file and looking up each record in the zip code file. This is called a fragment-replicate join (because you fragment one file and replicate the other).
--repljoin.pig
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float,
close:float, volume:int, adj_close:float);
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
jnd = join daily by (exchange, symbol), divs by (exchange, symbol)
using 'replicated';The using 'replicated' tells Pig
to use the fragment-replicate algorithm to execute this join. Since
no reduce phase is necessary, this can be done all in the
map task.
The second input listed in the join (in this
case, divs) is always the input that is loaded
into memory. Pig does not check beforehand that the specified input
will fit into memory. If Pig cannot fit the replicated input into
memory, it will issue an error and fail.
Warning
Due to the way Java stores objects in memory, the size of the data on disk will not be the size of the data in memory. See Memory Requirements of Pig Data Types for a discussion of how data expands in memory in Pig. You will need more memory for a replicated join than you need space on disk to store the replicated input.
Fragment-replicate join only supports inner and left outer join. It cannot do a right outer join because when a given map task sees a record in the replicated input that does not match any record in the fragmented input, it has no idea if it would match a record in a different fragment. So it does not know whether to emit a record or not. If you want right or full outer join you will need to use the default join operation.
Fragment-replicate join can be used with more than two tables. In this case all but the first (left-most) table are read into memory.
Pig implements the fragment-replicate join by loading the replicated input into Hadoop's distributed cache. The distributed cache is a tool Hadoop provides that pre-loads a file onto the local disk of nodes that will be executing the maps or reduces for that job. This has two important benefits. First, if you have a fragment-replicate join that is going to run on 1000 maps, opening one file in HDFS from 1000 different machines all at once puts a serious strain on the NameNode and the three data nodes that contain the block for that file. The distributed cache is built specifically to manage these kinds of issues without straining HDFS. Second, if multiple map tasks are located on the same physical machine, the files in the distributed cache are shared between those instances, thus reducing the number of times the file has to be copied.
Pig runs a map-only MapReduce job to
pre-processes the file to get it ready for loading into the
distributed cache. If there is a filter or
foreach between the load and
join these will be done as part of this initial job so
that the file to be stored in the distributed cache is as small as
possible. The join itself will be done in a second map-only
job.
Joining Skewed Data
As we have seen elsewhere, much of the data
you will be processing with Pig has significant skew in the number of
records per key. For example, if you were building a map of the web
and joining by the domain of the URL (your key), you would expect to
see significant skew for values like yahoo.com.
Pig's default join algorithm is very sensitive to skew, since it
collects all of the records for a given key together on a single
reducer. In many data sets, there are a few keys that have three or
more orders of magnitude more records than other keys. This results
in one or two reducers that will take much longer than the rest. To
deal with this, Pig provides skew join.
Skew join works by first sampling one input for the join. In that input it identifies any keys that have so many records that skew join estimates it will not be able to fit them all into memory. Then, in a second MapReduce job it does the join. For all records except those identified in the sample, it does a standard join, collecting records with the same key onto the same reducer. Those keys identified as too large are treated differently. Based on how many records were seen for a given key, those records are split across the appropriate number of reducers. The number of reducers is chosen based on Pig's estimate of how wide the data must be split such that each reducer can fit its split into memory. For the input to the join that is not split, those keys that were split are then replicated to each reducer that contains that key. This algorithm was proposed in the paper [David J. DeWitt et. al. “Practical Skew Handling in Parallel Joins”].
For example, let's look at how the following Pig Latin script would work:
users = load 'users' as (name:chararray, city:chararray); cinfo = load 'cityinfo' as (city:chararray, population:int); jnd = join cinfo by city, users by city using 'skewed';
Assume that the cities in
users are distributed such that 20 users live in
Barcelona, 100,000 in New York, and 350 in Portland. Let's further
assume that Pig determined that it could fit 75,000 records into
memory on each reducer. When this data was joined, New York would be
identified as a key that needed to be split across reducers. During
the join phase all records with keys other than New York would be
treated as in a default join. Records from users
with New York as the key would be split between two separate reducers.
Records from cityinfo with New York as a key
would be duplicated and sent to both of those reducers.
The second input in the join, in this case,
users, is the one that will be sampled and have its
keys with a large number of values split across reducers. The first
input will have records with those values replicated across
reducers.
This algorithm only addresses skew in one input. If both inputs have skew, this algorithm will still work, but it will be slow. Much of the motivation behind this approach was that it guarantees that, given time, the join will still finish. Before Pig introduced skew join in version 0.4, data that was skewed on both sides could not be joined in Pig because it was not possible to fit all the records for the high cardinality key values in memory for either side.
Skew join can be done on inner or outer joins. However, it can only take two join inputs. Multi-way joins must be broken into a series of joins if they need to use skew join.
Since data often has skew, why not use skew join all of the time? There is a small performance penalty for using skew join, since one of the inputs must be sampled first to find any key values with a large number of records. This usually adds about 5% to the time it takes to calculate the join. If your data frequently has skew, it may be worth it to always use skew join and pay the 5% tax in order to avoid failing or running very slowly with the default join and then needing to rerun using skewed join.
As stated to above, Pig estimates how much
data it can fit into memory when deciding which key values to split
and how wide to split them. For the purposes of this calculation Pig
looks at the record sizes in the sample and assumes it can use 30% of
the JVM's heap to materialize records that will be joined. In your
particular case you may find you need to increase or decrease this
size. You will want to decrease the value if your join is still
failing with out of memory errors even when using skew join. This
indicates that Pig is estimating memory usage improperly, so you
should tell it to use less. If profiling indicates that Pig is not
utilizing all of your heap, you may want to increase the value in
order to do the join more efficiently, since the less ways the key
values are split, the more efficient the join will be. You can do
that by setting the property
pig.skewedjoin.reduce.memusage to a value between 0 and
1. For example, if you wanted it to use 25% instead of 30%, you could
add -Dpig.skewedjoin.reduce.memusage=0.25 to your
Pig command line, or define the value in your properties file.
Warning
Like order, skew join
breaks the MapReduce covention that all records with the same key
will be processed by the same reducer. This means records with
the same key may be placed in separate part files. If you plan to
process the data in a way that depends on all records with the same
key being in the same part file then you cannot use skew
join.
Joining Sorted Data
A common database join strategy is to first sort both inputs on the join key, and then walk through both inputs together, doing the join. This is referred to as a sort-merge join. In MapReduce, since a sort requires a full MapReduce job, as does Pig's default join, this technique is not more efficient then the default. However, if your inputs are already sorted on the join key, this approach makes sense. The join can be done in the map phase by opening both files and walking through them. Pig refers to this as a merge join, since it is a sort-merge join, but the sort has already been done.
--mergejoin.pig
-- use sort_for_mergejoin.pig to build NYSE_daily_sorted and NYSE_dividends_sorted
daily = load 'NYSE_daily_sorted' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float,
close:float, volume:int, adj_close:float);
divs = load 'NYSE_dividends_sorted' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
jnd = join daily by symbol, divs by symbol using 'merge';To execute this join, Pig will first run a
MapReduce job that samples the second input,
NYSE_dividends_sorted. This sample builds an
index that tells Pig the value of the join keys,
symbol in the first record in every input split
(usually, each HDFS block). Since this sample reads only one record
per split, it runs very quickly. Pig will then run a second MapReduce
job that takes the first input, NYSE_daily_sorted
as its input. When each map reads the first record in its split of
NYSE_daily_sorted, it takes the value of
symbol and looks it up in the index built by the
previous job. It looks for the last entry that is less than its value
of symbol. It then opens
NYSE_dividends_sorted at the corresponding block
for that entry. For example, if the index contained entries:
(CA, 1), (CHY, 2), (CP, 3) and the first
symbol in a given map's input split of
NYSE_daily_sorted was CJA,
then that map would open block 2 of
NYSE_dividends_sorted. (Even if
CP was the first userid in
NYSE_daily_sorted's split, block 2 of
NYSE_dividends_sorted would
be opened, as there may be records with a key of CP
in that block.) Once NYSE_dividends_sorted is
opened, Pig throws away records until it reaches a record with
symbol of CJA. Once it finds a
match, it collects all the records with that value into memory, and
then does the join. It then advances the first input,
NYSE_daily_sorted. If the key is the same, it
again does the join. If not, it advances the second input
NYSE_dividends_sorted again until it finds a
value greater than or equal to the next value in the first input,
NYSE_daily_sorted. If the value is greater, it
advances the first input and continues. Since both inputs are sorted
it never needs to look in the index after the initial lookup.
Since all of this can be done without a reduce phase it is more efficient than a default join. This algorithm currently only supports two way joins. It also only supports inner joins. It was introduced in version 0.4.
Cogroup
cogroup is a
generalization of group. Instead of collecting records of
one input based on a key, it collects records of n
inputs based on a key. The result is a record with a key and one bag
for each input. Each bag contains all records from that input that have
the given value for the key.
A = load 'input1' as (id:int, val:float);
B = load 'input2' as (id:int, val2:int);
C = cogroup A by id, B by id;
describe C;
C: {group: int,A: {id: int,val: float},B: {id: int,val2: int}}Another way to think of cogroup is
as being the first half of a join. The keys are collected together, but
the cross product is not done. In fact, cogroup plus
foreach where each bag is flattened is equivalent to join,
as long as there are no null values in the keys.
cogroup handles null values in the
keys like group, and unlike join. That is,
all records with a null value in the key will be collected
together.
cogroup is useful when you want to
do join like things, but not a full join. For example, Pig Latin does
not have a semi-join operator. However, you can do a semi-join:
--semijoin.pig
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float,
close:float, volume:int, adj_close:float);
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
grpd = cogroup daily by (exchange, symbol), divs by (exchange, symbol);
sjnd = filter grpd by not IsEmpty(divs);
final = foreach sjnd generate flatten(daily);Since cogroup needs to collect
records with like keys together, it requires a reduce phase.
Union
Sometimes you want to put two data sets together not by joining them, but by concatenating them. Pig Latin provides union for this purpose. If you had two files you wanted to use for input, and there was no glob that could describe them, you could do:
A = load '/user/me/data/files/input1'; B = load '/user/someoneelse/info/input2'; C = union A, B;
Note
Unlike union in SQL, Pig does
not require that both inputs share the same schema. If both do share
the same schema, then the output of the union will have that schema. If
one schema can be produced from another by a set of implicit casts, then
the union will have that resulting schema. If neither of these
conditions hold, then the output will have no schema. That is,
different records will have different fields. This schema comparison
includes names, so even different field names will result in the output
having no schema. You can get around this by placing a
foreach before the union that renames
fields.
A = load 'input1' as (x:int, y:float); B = load 'input2' as (x:int, y:float); C = union A, B; describe C;C: {x: int,y: float}A = load 'input1' as (x:int, y:float); B = load 'input2' as (x:int, y:double); C = union A, B; describe C;C: {x: int,y: double}A = load 'input1' as (x:int, y:float); B = load 'input2' as (x:int, y:chararray); C = union A, B; describe C;Schema for C unknown.
Union does not perform a mathematical set union.
That is, duplicate records are not eliminated. In this manner it is
like SQL's union all.
Union does not require a separate reduce phase.
Sometimes your data changes over time. If you
have data you collect every month, you might add a new column this
month. Now you are prevented from using union because your
schemas do not match. If you want to union this data and force your
data into a common schema, you can add the keyword
onschema to your union
statement.
A = load 'input1' as (w: chararray, x:int, y:float);
B = load 'input2' as (x:int, y:double, z:chararray);
C = union onschema A, B;
describe C;
C: {w: chararray,x: int,y: double,z: chararray}union onschema requires that all
inputs have schemas. It also requires that a shared schema for all
inputs can be produced by adding fields and implicit casts. Matching of
fields is done by name, not position. So in the above example
w:chararray is added from input1
and z:chararray is added from
input2. Also, a cast from float to
double is added for input1 so that
field y is a double. If a shared schema
cannot be produced by this method then an error is returned. When the
data is read nulls are inserted for fields not present in a given
input.
Cross
cross matches the
mathematical set operation of the same name. In the following Pig Latin
cross takes every record in NYSE_daily
and combines it with every record in
NYSE_dividends.
--cross.pig
-- you may want to run this in a cluster, it produces about 3G of data
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float,
close:float, volume:int, adj_close:float);
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
tonsodata = cross daily, divs parallel 10;cross tends to produce a lot of
data. Given inputs with n and m
records respectively, cross will produce output with
n x m records.
Pig does implement cross in a
parallel fashion. It does this by generating a synthetic join key,
replicating rows, and then doing the cross as a join. The above script
is rewritten to:
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float,
close:float, volume:int, adj_close:float);
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
A = foreach daily generate flatten(GFCross(0, 2)), flatten(*);
B = foreach divs generate flatten(GFCross(1, 2)), flatten(*);
C = cogroup A by ($0, $1), B by ($0, $1) parallel 10;
tonsodata = foreach C generate flatten(A), flatten(B);GFCross is an internal
UDF. The first argument is the input number, the second argument is the
total number of inputs. The output is a bag that contains, in this
example, four records.[17] These records have a
schema of (int, int). The field that is the same
number as the first argument to GFCross contains
a random number between zero and three. The other field counts from
zero to three. So if we assume for a given two records, one in each
input, the random number for the first input was 3
and for the second 2, then the outputs of
GFCross would look like:
A {(3, 0), (3, 1), (3, 2), (3, 3)}
B {(0, 2), (1, 2), (2, 2), (3, 2)}When these records are flattened, four copies of each input record will be created in the map. They are then joined on the artificial keys. For every record in each input, it is guaranteed that there is one and only one instance of the artificial keys that will match and produce a record. Since the random numbers are chosen differently for each record, the resulting joins are done on an even distribution of the reducers.
This algorithm does enable crossing of data in
parallel. However, it creates a burden on the shuffle phase by
increasing the number of records in each input being shuffled. Also, no
matter what you do, cross outputs a lot of data. Writing
all of this data to disk is expensive, even when done in
parallel.
This is not to say you should not use
cross. There are places it is indispensable. Pig's
join operator only supports equi-joins, that is, joins on
an equality condition. Because general join implementations (ones that
do not depend on the data being sorted or small enough to fit in
memory) in MapReduce depend on collecting records with the same join
key values onto the same reducer, non-equi-joins (also called theta
joins) are difficult to do. They can be done in Pig using
cross followed by filter.
--thetajoin.pig
--I recommand running this one on a cluster too
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float,
close:float, volume:int, adj_close:float);
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
crossed = cross daily, divs;
tjnd = filter crossed by daily::date < divs::date;Fuzzy joins could also be done in this manner, where the fuzzy comparison is done after the cross. However, whenever possible it is better to use a UDF to conform fuzzy values to a standard value and then do a regular join. For example, if you wanted to join two inputs on city, but you wanted to join any time two cities were in the same metropolitan area (e.g. you wanted “Los Angeles” and “Pasadena” to be viewed as equal) you could first run your records through a UDF that generated a single join key for all cities in a metropolitan area, and then do the join.
Integrating Pig with Legacy Code and MapReduce
One tenet of Pig's philosophy (see the section called “Pig Philosophy”) is that Pig allows users to integrate their own code with Pig wherever possible. The most obvious way Pig does that is through its UDFs. But it also allows you to directly integrate other executables and MapReduce jobs.
Stream
To specify an executable that you want to insert
into your data flow, use stream. You may want to do this
when you have a legacy program that you do not want or are unable to
change. You can also use stream when you have a program
you frequently use or have tested on small data sets and you want to
apply it to a large data set. Let's look at an example where you have a
Perl program highdiv.pl that filters out all stocks
with a dividend below $1.00.
-- streamsimple.pig divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends); highdivs = stream divs through `highdiv.pl` as (exchange, symbol, date, dividends);
Notice the as clause in the
stream command. This is not required. But Pig has no idea
what the executable will return. So if you do not provide the
as clause the relation highdivs will
have no schema.
The executable highdiv.pl
is invoked once on every map or reduce task. It is not invoked once per
record. Pig instantiates the executable and keeps feeding data to it
via stdin. It also keeps checking
stdout and passing any results to the next operator
in your data flow. The executable can choose whether to produce an
output for every input, or only every so many inputs, or only after all
inputs have been received.
The above example assumes that you already have
highdiv.pl installed on your grid and runnable
from the working directory on the task machines. If that is not the
case, which it usually will not be, you can ship the executable to the
grid. To do this you use a define statement.
--streamship.pig
define hd `highdiv.pl` ship('highdiv.pl');
divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends);
highdivs = stream divs through hd as (exchange, symbol, date, dividends);This define does two things. First,
it defines the executable that will be used. Now in stream
we refer to highdiv.pl by the alias we gave it,
hp, rather than referring to it directly. Second, it
tells Pig to pick up the file ./highdiv.pl and ship
it to Hadoop as part of this job. This file will be picked up from the
specified location on the machine you launch the job. It will be placed
in the working directory of the task on the task machines. So the
command you pass to stream must refer to it relative to the
current working directory, not via an absolute path. If your executable
depends on other modules or files, they can be specified as part of the
ship clause as well. For example, if the
highdiv.pl depends on a Perl module
Financial.pm you can send them both to the task
machines.
define hd `highdiv.pl` ship('highdiv.pl', 'Financial.pm');
divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends);
highdivs = stream divs through hd as (exchange, symbol, date, dividends);Many scripting languages assume certain paths
for modules based on their hierarchy. For example, Perl expects to find
a module Acme::Financial in
Acme/Financial.pm. However, the ship
clause always puts files in your current working directory. And it does
not take directories; so you could not ship “Acme”. The
workaround for this is to create a tar file and ship that, and then have
a step in your executable that unbundles the tar file. You will then
need to set your module include path (for Perl, -I or
the PERLLIB environment variables) to contain
“.”.
ship moves files from the machine
you are launching your job from into the grid. But sometimes the file
you want is already in the grid. If you have a grid file that will be
accessed by every map or reduce task in your job, the proper way to
access it is via the distributed cache. The
distributed cache is a mechanism Hadoop provides to share files. It
reduces the load on HDFS by pre-loading the file to local disk on the
machine that will be executing the task. You can use the distributed
cache for your executable by using the cache clause in
define.
crawl = load 'webcrawl' as (url, pageid);
normalized = foreach crawl generate normalize(url);
define blc `blacklistchecker.py` cache('/data/shared/badurls#badurls');
goodurls = stream normalized through blc as (url, pageid);The string before the “#” is the
path on HDFS, in this case /data/shared/badurls.
The string after the “#” is the name of the file as viewed
by the executable. So Hadoop will put a copy of
/data/shared/badurls into the task's working
directory and call it badurls.
So far we have assumed that your executable
takes data on stdin and writes it to
stdout. This may not work, depending on your
executable. If your executable needs a file to read from or to write
to, or both, you can specify that with the input and
output clauses in the define command.
Continuing with our previous example, let's say that
blacklistchecker.py expects to read its input from
a file specified by -i on its command line and write
to a file specified by -o.
crawl = load 'webcrawl' as (url, pageid);
normalized = foreach crawl generate normalize(url);
define blc `blacklistchecker.py -i urls -o good` input('urls') output('good');
goodurls = stream normalized through blc as (url, pageid);Again, file locations are specified from the
working directory on the task machines. In this example, Pig will write
out all the input for a given task for
blacklistchecker.py to urls,
then invoke the executable, and then read good to
get the results. Again, the executable will only be invoked once per
map or reduce task, so Pig will first write out all in the input to the
file.
Mapreduce
Beginning in Pig 0.8, you can also include MapReduce jobs directly in your data flow with the mapreduce command. This is convenient if you have processing that is better done in MapReduce than Pig, but that must be integrated with the rest of your Pig data flow. It can also make it easier to incorporate legacy processing written in MapReduce with newer processing you want to write in Pig Latin.
MapReduce jobs expect to read their input from
and write their output to a storage device (usually HDFS). So to
integrate them with your data flow, Pig has to first store the data,
then invoke the MapReduce job, and then read the data back. This is
done via store and load clauses in the
mapreduce statement that invoke regular load and store
functions. You also provide Pig with the name of the jar that contains
the code for your MapReduce job.
As an example let's continue with the blacklisting of URLs that we considered in the previous section. Only now let's assume that this is done by a MapReduce job instead of a Python script.
crawl = load 'webcrawl' as (url, pageid);
normalized = foreach crawl generate normalize(url);
goodurls = mapreduce 'blacklistchecker.jar'
store normalized into 'input'
load 'output' as (url, pageid);mapreduce takes as its first
argument the jar containing the code to run a MapReduce job. It uses
load and store phrases to specify how data
will be moved from Pig's data pipeline to the MapReduce job. Notice
that the input alias is contained in the store
clause. As with stream, the output of
mapreduce is opaque to Pig, so if we want the resulting
relation goodurls to have a schema we have to tell
Pig what it is. This example also assumes that the Java code in
blacklistchecker.jar knows which input and output
files to look for and has a default class to run specified in its
manifest. Often this will not be the case. Any arguments you wish to
pass to the invocation of the Java command that will run the MapReduce
task can be put in back quotes after the load
clause.
crawl = load 'webcrawl' as (url, pageid);
normalized = foreach crawl generate normalize(url);
goodurls = mapreduce 'blacklistchecker.jar'
store normalized into 'input'
load 'output' as (url, pageid)
`com.acmeweb.security.BlackListChecker -i input -o output`;The string in the back quotes will be passed directly to your MapReduce job as is. So if you wanted to pass java options, etc. you can do that as well.
The load and store
clauses of the mapreduce command have the same syntax as
the load and store statements. So you can use
different load and store functions, pass constructor arguments, etc.
See the section called “Load” and the section called “Store” for full
details.
Non-Linear Data Flows
So far our examples have been linear data flows or
trees. In a linear data flow, one input is loaded, processed, and stored.
We have looked at operators that combine multiple data flows:
join, cogroup, union, and
cross. With these you can build tree structures where
multiple inputs all flow to a single output. But in complex data
processing situations you often also want to split your data flow. That
is, one input will result in more than one output. You may also have
diamonds, places where the data flow is split and eventually joined back
together. Pig supports these directed acyclic graph (DAG) data
flows.
Splits in your data flow can be either implicit or
explicit. In an implicit split, no specific operator or syntax is
required in your script. You simply refer to a given relation multiple
times. As an example let's consider data from our
baseball example data. You might, for example want
to analyze players by position, and at the same time by team.
--multiquery.pig
players = load 'baseball' as (name:chararray, team:chararray,
position:bag{t:(p:chararray)}, bat:map[]);
pwithba = foreach players generate name, team, position,
bat#'batting_average' as batavg;
byteam = group pwithba by team;
avgbyteam = foreach byteam generate group, AVG(pwithba.batavg);
store avgbyteam into 'by_team';
flattenpos = foreach pwithba generate name, team,
flatten(position) as position, batavg;
bypos = group flattenpos by position;
avgbypos = foreach bypos generate group, AVG(flattenpos.batavg);
store avgbypos into 'by_position';The pwithba relation is
referred to by the group operators for both the
byteam and bypos relations. Pig
builds a data flow that takes every record from pwithba and
ships it to both group operators.
Splitting of data flows can also be done explicitly via the split operator. It allows you to split your data flow as many ways as you like. Let's take an example where you want to split data into different files depending on the date the record was created.
wlogs = load 'weblogs' as (pageid, url, timestamp);
split wlogs into apr03 if timestamp < '20110404',
apr02 if timestamp < '20110403' and timestamp > '20110401',
apr01 if timestamp < '20110402' and timestamp > '20110331';
store apr03 into '20110403';
store apr02 into '20110402';
store apr01 into '20110401';At a first glance split looks like a
switch or case statement, but it is
not. A single record can go to multiple legs of the split
since you use different filters for each if clause. And a
record can go to no leg. In the example above if a record was found with
a date of 20110331 it would be dropped. And there is
no default clause; no way to send any leftover records to a particular
alias.
split is semantically identical to an
implicit split that users filters. The above example could be rewritten
as:
wlogs = load 'weblogs' as (pageid, url, timestamp); apr03 = filter wlogs by timestamp < '20110404'; apr02 = filter wlogs by timestamp < '20110403' and timestamp > '20110401'; apr01 = filter wlogs by timestamp < '20110402' and timestamp > '20110331'; store apr03 into '20110403'; store apr02 into '20110402'; store apr01 into '20110401';
In fact, Pig will internally rewrite the original
script that has split in exactly this way.
Let's take a look at how Pig executes these
non-linear data flows. Whenever possible, it combines them into single
MapReduce jobs. This is referred to as a
multi-query. In cases where all operators will fit
into a single map task, this is easy. Pig creates separate pipelines
inside the map and sends the appropriate records to each pipeline. The
example above using split to store data by date will be
executed in this way.
Pig can also combine multiple group
operators together in many cases. In the example given at the beginning
of the section where the baseball data is grouped by both team and
position, this entire Pig Latin script will be executed inside one
MapReduce job. Pig accomplishes this by duplicating records on the map
side and annotating each record with its pipeline number. When the data
is partitioned during the shuffle the appropriate key is used for each
record. That is, records from the pipeline grouping by
team will use team as their shuffle
key and records from the pipeline grouping by position
will use position as their shuffle key. This is done
by declaring the key type to be tuple and placing the correct values in
the key tuple for each record. Once the data has been collected to
reducers, the pipeline number is used as part of the sort key so that
records from each pipeline and group are collected together. In the
reduce task Pig instantiates multiple pipelines, one for each group
operator. It sends each record down the appropriate pipeline based on its
annotated pipeline number. In this way input data can be scanned once,
but grouped many different ways. An example of how one record flows
through this pipeline is shown in Figure 6.1, “Multi-query Illustration”.
While this does not provide linear speed up, we find it often approaches
it.
There are cases where Pig will not combine
multiple operators into a single MapReduce job. It does not use
multi-query for any of the multiple input operators: join,
union, cross, or cogroup. It does
not use multi-query for order statements either. Also, if it
has multiple group statements and some would use the Hadoop's
combiner and some would not, it only combines those statements that use
Hadoop's combiner into a multi-query. This is because we have found that
combining Hadoop combiner and non-Hadoop combiner jobs together does not
perform well.
While multi-query scripts tend to perform better
than loading the same input multiple times, it does have limits. Since it
requires replicating records in the map it does slow down the shuffle
phase. Eventually the increased cost of the shuffle phase outweighs the
reduced cost of rescanning the input data. Pig has no way to estimate
when this will occur. Currently the optimizer is optimistic and always
combines jobs with multi-query whenever it can. If it combines too many
jobs and becomes slower then splitting some of the jobs you can turn off
multi-query or you can rewrite your Pig Latin into separate scripts so Pig
does not attempt to combine them all. To turn off multi-query you can pass
either -M or -no_multiquery on the
command line or set the property opt.multiquery to
false.
We must also consider what happens when one job in
a multi-query fails, but others succeed. If all jobs succeed, then Pig
will return 0, meaning success. If all of the jobs fail, Pig will return
2. If some jobs fail and some succeed, then Pig will return 3. By
default, if one of the jobs fails, Pig will continue processing the other
jobs. However, if you want Pig to stop as soon as one of the jobs fails,
you can pass -F or -stop_on_failure.
In this case any jobs that have not yet been finished will be terminated,
and any that have not started will not be started. Any jobs that are
already finished will not be cleaned up.
Controlling Execution
In addition to providing many relational and data flow operators Pig Latin provides ways for you to control how your jobs execute on MapReduce. It allows you to set values that control your environment and to control details of MapReduce such as how your data is partitioned.
Set
The set command is used
to set the environment in which Pig runs the MapReduce jobs. Table 6.1, “Pig Specific Set Parameters” shows Pig specific parameters that can be
controlled via set.
Table 6.1. Pig Specific Set Parameters
| Parameter | Value Type | Description |
|---|---|---|
| debug | string | Sets the logging level to DEBUG.
Equivalent to passing -debug DEBUG on the
command line. |
| default_parallel | integer | Sets a default parallel level for all reduce operations in the script. See the section called “Parallel” for details. |
| job.name | string | Assigns a name to the Hadoop job. By default the name is the filename of the script being run, or a randomly generated name for interactive sessions. |
| job.priority | string Type | If your Hadoop cluster is using the Capacity Scheduler
with priorities enabled for queues, this allows you to set the
priority of your Pig job. Allowed values are
very_low, low,
normal, high,
very_high. |
For example, to set the default parallelism of your Pig Latin script and to set the job name to “my_job”:
set default_parallel 10; set job.name my_job; users = load 'users';
In addition to these predefined values,
set can be used to pass Java property settings to Pig and
Hadoop. Both Pig and Hadoop use a number of Java properties to control
their behavior. Consider an example where you want to turn multi-query
off for a given script, and you want to tell Hadoop to use a higher
value for its map side sort buffer than usual.
set opt.multiquery false; set io.sort.mb 2048; --give it 2G
You can also use this mechanism to pass
properties to UDFs. All of the properties are passed to the tasks on
the Hadoop nodes when they are executed. They are not set as Java
properties in that environment. Rather they are placed in a Hadoop
object called JobConf. UDFs have access to the
JobConf. Thus anything you set in the script can
be seen by your UDFs. This can be a convenient way to control UDF
behavior. For information on how to retrieve this information in your
UDFs see the section called “Constructors and Passing Data from Front End to Back End”.
Values that are set in your script are global for the whole script. If they are reset later in the script, that second value will overwrite the first and be used through the whole script.
Setting the Partitioner
Hadoop uses a class called
Partitioner to partition records to reducers
during the shuffle phase. For details on partitioners see the section called “Shuffle Phase”. Pig does not override the default
partitioner, except for order and skew join. The balancing
operations in these require special
Partitioners.
Beginning in version 0.8, Pig allows you to set
the partitioner, except in the cases where it is already overriding it.
To do this you need to tell Pig the Java class to use to partition your
data. This class must extend Hadoop's
org.apache.hadoop.mapreduce.Partitioner<KEY,VALUE>.
Note that this is the newer (version 0.20 and later)
mapreduce API and not the older
mapred.
register acme.jar; --jar containing the partitioner users = load 'users' as (id, age, zip); grp = group users by id partition by com.acme.userpartitioner parallel 100;
Operators that reduce data can take the
partition clause: cogroup, cross,
distinct, group, and join (again,
not in conjunction with skew join).
Pig Latin Preprocessor
Pig Latin has a preprocessor that runs before your
Pig Latin script is parsed. In 0.8 and before this provided parameter
substitution, roughly similar to a very simple version of
#define in C. Starting with 0.9 it also provides inclusion
of other Pig Latin scripts and function-like macro definitions, so that
you can write Pig Latin in a modular way.
Parameter Substitution
Pig Latin scripts that are frequently used often have elements that need to change based on when or where they are run. A script that is run every day is likely to have a date component in its input files or filters. Rather than edit and change the script every day, you want to pass in the date as a parameter. Parameter substitution provides this capability with a basic string replacement functionality. Parameters must start with a letter or an underscore and can then have any number of letters, numbers, or underscores. Values for the parameters can be passed in on the command line or from a parameter file.
--daily.pig
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float, close:float,
volume:int, adj_close:float);
yesterday = filter daily by date == '$DATE';
grpd = group yesterday all;
minmax = foreach grpd generate MAX(yesterday.high), MIN(yesterday.low);When you run daily.pig you
must provide a definition for the parameter DATE, or
you will get an error telling you that you have undefined
parameters.
pig -p DATE=2009-12-17 daily.pig
You can repeat the -p command
line switch as many times as needed. Parameters can also be placed in a
file, which is convenient if you have more than a few of them. The
format of the file is parameter=value, one per line.
Comments in the file should be proceeded by a #. You
then indicate the file to be used with -m or
-param_file.
pig -param_file daily.params daily.pig
Parameters passed on the command line take precedence over parameters provided in files. This way you can provide all your standard parameters in a file and override a few as needed on the command line.
Parameters can contain other parameters. So, for example, you could have the following parameter file:
#Param file YEAR=2009- MONTH=12- DAY=17 DATE=$YEAR$MONTH$DAY
A parameter must be defined before
it is referenced. The above parameter file would produce an error if the
DAY line came after the DATE line.
The other caveat is that there is no special character to delimit the
end of a parameter. Any alphanumeric or underscore character will be
interpreted as part of the parameter, and any other character will be
interpreted as itself. So, if you had a script that ran at the first of
every month you could not do the following:
wlogs = load 'clicks/$YEAR$MONTH01' as (url, pageid, timestamp);
This would try to resolve a parameter
MONTH01 when you meant
MONTH.
When using parameter substitution all parameters
in your script must be resolved after the preprocessor is finished. If
not, Pig will issue an error message and not continue. You can see the
results of your parameter substitution by using the
-dryrun flag on the Pig command line. Pig will write
out a version of your Pig Latin script with the parameter substitution
done, but it will not execute the script.
You can also define parameters inside your Pig
Latin script using %declare and
%default. %declare allows you to
define a parameter in the script itself. %default is
useful to provide a common default value that can be overridden when
needed. Consider a case where most of the time your script is run on
one Hadoop cluster, but occasionally it is run on a different cluster
with different hardware.
%default parallel_factor 10; wlogs = load 'clicks' as (url, pageid, timestamp); grp = group wlogs by pageid parallel $parallel_factor; cntd = foreach grp generate group, COUNT(wlogs);
When running your script in the usual
configuration there is no need to set the parameter
parallel_factor. On the occasions it is run in a
different setup the parallel factor can be changed by passing a value on
the command line.
Macros
Starting in 0.9, Pig added the ability to define macros. This makes it possible to make your Pig Latin scripts modular. It also makes it possible to share segments of Pig Latin code between users. This can be particularly useful for defining standard practices and making sure all data producers and consumers use them.
Macros are declared with the define
statement. A macro takes a set of input parameters. These are string
values that will be substituted for the parameters when the macro is
expanded. By convention input relation names are placed first before
other parameters. The output relation name is given in a
returns statement. The operators of the macro are enclosed
in {} (braces). Anywhere the parameters, including
the output relation name, are referenced inside the macro they must be
proceeded by a $ (dollar sign). The macro
is then invoked in your Pig Latin by assigning it to a relation.
--macro.pig
-- Given daily input and a particular year, analyze how
-- stock prices changed on days dividends were paid out.
define dividend_analysis (daily, year, daily_symbol, daily_open, daily_close)
returns analyzed {
divs = load 'NYSE_dividends' as (exchange:chararray,
symbol:chararray, date:chararray, dividends:float);
divsthisyear = filter divs by date matches '$year-.*';
dailythisyear = filter $daily by date matches '$year-.*';
jnd = join divsthisyear by symbol, dailythisyear by $daily_symbol;
$analyzed = foreach jnd generate dailythisyear::$daily_symbol,
$daily_close - $daily_open;
};
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float, close:float,
volume:int, adj_close:float);
results = dividend_analysis(daily, '2009', 'symbol', 'open', 'close');It is also possible to have a macro that does
not return a relation. In this case the returns clause of
the define statement is changed to returns
void. This can be useful if you want to define a macro that
controls how data is partitioned and sorted before being stored to a
particular output such as HBase or a database.
These macros are expanded in-line. This is
where an important difference between macros and functions becomes
apparent. Macros cannot be invoked recursively. Macros can invoke
other macros, so a macro A can invoke a macro
B. But A cannot invoke itself.
And once A has invoked B,
B cannot invoke A. Pig will
detect these loops and throw an error.
Parameter substitution, the section called “Parameter Substitution” cannot be used inside of macros. Parameters should be explicitly passed to macros and parameter substitution used only at the top level.
You can use the -dryrun
command line argument to see how the macros are expanded in-line. When
the macros are expanded the alias names are changed to avoid collisions
with alias names in the place the macro is being expanded. If we take
the above example and use -dryrun to show us the
resulting Pig Latin we will see (reformatted slightly to fit on the
page):
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float, close:float,
volume:int, adj_close:float);
macro_dividend_analysis_divs_0 = load 'NYSE_dividends' as (exchange:chararray,
symbol:chararray, date:chararray, dividends:float);
macro_dividend_analysis_divsthisyear_0 =
filter macro_dividend_analysis_divs_0 BY (date matches '2009-.*');
macro_dividend_analysis_dailythisyear_0 = filter daily BY (date matches '2009-.*');
macro_dividend_analysis_jnd_0 =
join macro_dividend_analysis_divsthisyear_0 by (symbol),
macro_dividend_analysis_dailythisyear_0 by (symbol);
results = foreach macro_dividend_analysis_jnd_0 generate
macro_dividend_analysis_dailythisyear_0::symbol, close - open;As you can see, the aliases in the macro are expanded with a combination of the macro name and the invocation number. This provides a unique key so that if other macros use the same aliases or the same macro is used multiple times, there is still no duplication.
Including other Pig Latin Scripts
For a long time in Pig Latin, the entire script needed to be in one file. This produced some rather unpleasant multi-thousand line Pig Latin scripts. Starting in 0.9 the preprocessor can be used to include one Pig Latin script in another. Taken together with the macros (also added in 0.9, see the section called “Macros”) it is now possible to write modular Pig Latin that is easier to debug and re-use.
import is used to include one Pig Latin script in another.
--main.pig
import '../examples/ch6/dividend_analysis.pig';
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float, close:float,
volume:int, adj_close:float);
results = dividend_analysis(daily, '2009', 'symbol', 'open', 'close');Import writes the imported file directly into
your Pig Latin script in place of the import statement. In the above
example the contents of dividend_analysis.pig will
be placed immediately before the load statement. Note that
a file cannot be imported twice. If you wish to use the same
functionality multiple times you should write it as a macro and import
the file with that macro.
In the above example we used a relative path for
the file to be included. Fully qualified paths can also be used. By
default relative paths are taken from the current working directory of
Pig when you launch the script. You can set a search path by setting
the pig.import.search.path property. This is a
comma-separated list of paths that will be searched for your files. The
current working directory, “.” is always in the search
path.
set pig.import.search.path '/usr/local/pig,/grid/pig'; import 'acme/macros.pig';
Imported files are not in separate namespaces. This means that all macros are in the same namespace, even when they have been imported from separate files. Thus care should be taken to choose unique names for your macros.
[15] Those with database experience will notice that this is a violation of the first normal form as defined by E.F. Codd. This intentional de-normalization of data is very common in OLAP systems in general, and in large data processing systems like Hadoop in particular. RDBMS systems tend to make joins common and then work to optimize them. In systems like Hadoop, where storage is cheap and joins are expensive, it is generally better to use nested data structures to avoid the joins.
[16] In versions 0.8
and previous, there is a bug where this flatten is
assigned a schema of one field which is a bytearray
instead of causing the schema to be null. This bug has been fixed in
0.9.
[17] In 0.8 and before the number of records is always ten. In 0.9 this is changed to be the square root of the parallel factor, rounded up.






View 1 comment




Where did 'linus' come from in the example?
================ ,{(nobelprize.org),(orst.edu)} condoleeza,{(stanford.edu)} calvin,{(beliefnet.com),(unige.ch)}
---> linus,nobelprize.org linus.orst.edu condoleeza,standford.edu calvin,beliefnet.com calvin,unige.ch
Add a comment