Chapter 9. Embedding Pig Latin in Python
Pig Latin is a dataflow language. Unlike general
purpose programming languages, it does not include control flow constructs
like if and for. For many data processing
applications the operators Pig provides are sufficient. But there are
classes of problems that require the data flow to be either repeated an
indefinite number of times, or that need to branch based on the results of
an operator. Iterative processing where a calculation needs to be repeated
until the margin of error is within an acceptable limit is an example. It
is not possible before hand to know how many times the data flow will need
to be run before processing begins.
Blending data flow and control flow in one language is difficult to do in a way that is useful and intuitive. Building a general purpose language and all the associated tools such as IDEs and debuggers is a considerable undertaking; also there is no lack of such languages already. If we turned Pig Latin into a general purpose language it would require users to learn a much bigger language to process their data. For these reasons we decided to embed Pig in existing scripting languages. This avoids the need to invent a new language while still providing users features they need to process their data.[20]
As with UDFs, we chose Python to use for the initial release of embedded Pig in version 0.9. The embedding interface is a Java class, so a Jython interpreter is used to run these Python scripts that embed Pig. This means Python 2.5 features can be used, but Python 3 features cannot. In the future we hope to extend the system to other scripting languages that can access Java objects such as JavaScript[21] and JRuby. Of course, since the Pig infrastructure is all in Java, it is possible to use this same interface to embed Pig into Java scripts.
This embedding is done in a JDBC-like style, where
your Python script first compiles a Pig Latin script, then binds variables
from Python to it, and finally runs it. It is also possible to do file
system operations, register jars, and perform other utility operations through
the interface. The top level class for this interface is
org.apache.pig.scripting.Pig.
Throughout this chapter we will use an example of
calculating page rank from a web crawl. You can find this example under
examples/ch9 in the example code. This code iterates
over a set of URLs and links to produce a page rank for each
URL.[22] The input
to this example is the webcrawl data set found in the
examples. Each record in this input contains a URL, a starting rank of
1, and a bag with a tuple for each link found at that
URL.
http://pig.apache.org/privacypolicy.html 1 {(http://www.google.com/privacy.html)}
http://www.google.com/privacypolicy.html 1 {(http://www.google.com/faq.html)}
http://desktop.google.com/copyrights.html 1 {}
Even though control flow is done via a Python script,
it can still be run using Pig's bin/pig script.
bin/pig looks for the #! line and calls the
appropriate interpreter. This allows you to use these scripts with systems
that expect to invoke a Pig Latin script. It also allows Pig to include
UDFs from this file automatically, and to give correct line numbers for
error messages.
In order to use the Pig class and related objects, the code must first import them into the Python script.
from org.apache.pig.scripting import *
Compile
Calling the static method Pig.compile
causes Pig to do an initial compilation of the code. Since we have not
bound the variables yet, this check cannot completely verify the script.
Type checking and other semantic checking is not done at this phase, only
the syntax is checked. compile returns a
Pig object that can be bound to a set of
variables.
# pagerank.py
P = Pig.compile("""
previous_pagerank = load '$docs_in' as (url:chararray, pagerank:float,
links:{link:(url:chararray)});
outbound_pagerank = foreach previous_pagerank generate
pagerank / COUNT(links) as pagerank,
flatten(links) as to_url;
cogrpd = cogroup outbound_pagerank by to_url,
previous_pagerank by url;
new_pagerank = foreach cogrpd generate group as url,
(1 - $d) + $d * SUM (outbound_pagerank.pagerank)
as pagerank,
flatten(previous_pagerank.links) as links,
flatten(previous_pagerank.pagerank) AS previous_pagerank;
store new_pagerank into '$docs_out';
nonulls = filter new_pagerank by previous_pagerank is not null and
pagerank is not null;
pagerank_diff = foreach nonulls generate ABS (previous_pagerank - pagerank);
grpall = group pagerank_diff all;
max_diff = foreach grpall generate MAX (pagerank_diff);
store max_diff into '$max_diff';
""")The only thing in this Pig Latin script that we
have not seen before is the four parameters, marked in the script as
$d, $docs_in, $docs_out, and
$max_diff. The syntax for these parameters is the same as
for parameter substitution. However, Pig expects these to be supplied by
the control flow script when bind is called.
There are three other compilation methods in
addition to the one shown in this example. compile(String name,
String script) takes a name in addition to the Pig Latin to be
compiled. This name can be used in other Pig Latin code blocks to import
this block.
P1 = Pig.compile("initial", """
A = load 'input';
...
""")
P2 = Pig.compile("""
import initial;
B = load 'more_input';
...
""")The other two compilation methods are called
compileFromFile. These take the same arguments as
compile, but they expect the script argument to refer to a file
containing the script, rather than the script itself.
Bind
Once your script has successfully been compiled,
the next step is to bind variables in the control flow to variables in Pig
Latin. In our example script this is done by providing a map to the
bind call. The keys are the name of the variables in Pig
Latin. The values in this example are literal string values that are
updated as the script progresses. They could also be references to Python
variables.
# pagerank.py
params = { 'd': '0.5', 'docs_in': 'data/webcrawl' }
for i in range(10):
out = "out/pagerank_data_" + str(i + 1)
max_diff = "out/max_diff_" + str(i + 1)
params["docs_out"] = out
params["max_diff"] = max_diff
Pig.fs("rmr " + out)
Pig.fs("rmr " + max_diff)
bound = P.bind(params)
stats = bound.runSingle()
if not stats.isSuccessful():
raise 'failed'
mdv = float(str(stats.result("max_diff").iterator().next().get(0)))
print "max_diff_value = " + str(mdv)
if mdv < 0.01:
print "done at iteration " + str(i)
break
params["docs_in"] = outFor the initial run, the Pig Latin$d
will take on the value of 0.5,
$docs_in the filename webcrawl,
$docs_out out/pagerank_data_1,
and $max_diff out/max_diff_1.
bind returns a
BoundScript object. This object can be run,
explained, described, or illustrated. As is shown in this script, a
single Pig object can be bound multiple times.
A compile is only necessary on the first pass, with different values
being bound to it each time.
In our example bind is given a
mapping of the variables to bind. If all of your Python variables and Pig
Latin variables have the same name, then you can call bind
with no arguments. This will cause bind to look in the
Python context for variables of the same name as the parameters in Pig and
use them. If it cannot find appropriate variables, then it will throw an
error. We could change our example script to look like this:
# pagerankbindnoarg.py
d = 0.5
docs_in = 'data/webcrawl'
for i in range(10):
docs_out = "out/pagerank_data_" + str(i + 1)
max_diff = "out/max_diff_" + str(i + 1)
Pig.fs("rmr " + docs_out)
Pig.fs("rmr " + max_diff)
bound = P.bind()
stats = bound.runSingle()
if not stats.isSuccessful():
raise 'failed'
mdv = float(str(stats.result("max_diff").iterator().next().get(0)))
print "max_diff_value = " + str(mdv)
if mdv < 0.01:
print "done at iteration " + str(i)
break
docs_in = docs_outBinding Multiple Sets of Variables
Our example page rank script binds its compiled Pig Latin to different variables multiple times in order to iterate over the data. Each of these jobs is run separately, as is required by the iterative nature of calculating page rank. However, sometimes you want to run a set of jobs together. Consider calculating census data from countries all over the world. You want to run the same Pig Latin for each country. But you do not want to run them separately. There is no point in having a massively parallel system like Hadoop if you are going to run jobs one at a time. You want to tell Pig to take your script and run it against input from all the countries at the same time.
There is a form of bind that
provides this capability. Instead of taking a map of parameters, it
takes a list of maps of parameters. It still returns a single
BoundScript object. But when run is
called on this object all of the separate instantiations of the script
will be run together.
#!/usr/bin/python
from org.apache.pig.scripting import *
pig = Pig.compile("""
input = load '$country' using CensusLoader();
...
store output into '$country_out';
""")
params = [{'country': 'Afghanistan', 'country_out': 'af.out'},
...
{'country': 'Zimbabwe', 'country_out': 'zw.out'}]
bound = pig.bind(params)
stats = bound.run()Run
Once we have our
BoundScript object we can call
runSingle to run it. This tells Pig to run a single Pig
Latin script. This is appropriate when you have bound your script to just
one set of variables. runSingle returns a
PigStats. This object allows you to get your
results, examine what happened in your script, including status, error
codes and messages if there was an error, as well as getting statistics
about the run itself. Table Table 9.1, “PigStats Methods” summarizes
the more important methods available for
PigStats.
Table 9.1. PigStats Methods
| Method | Returns |
|---|---|
result(String alias) | Given an alias, return a OutputStats
object that describes the output stored from that alias. You can
get a results iterator from
OutputStats. |
isSuccessful() | True if all went well, false otherwise. |
getReturnCode() | Gets the return code from running Pig. See Table 2.1, “Pig Return Codes” for return code details. |
getErrorMessage() | Return the error message if the run failed. This will try to pick the most relevant error message that was returned, most likely the last. |
getAllErrorMessages() | Return a list all of the error messages if the run failed. |
getOutputLocations() | Returns a list of location strings that were stored to in the script. For example if you wrote output to a file on HDFS, this will return the filename. |
getOutputNames() | Returns a list of aliases that were stored in the script. |
getRecordWritten() | Returns the total number of records written by the script. |
getBytesWritten() | Returns the total number of bytes written by the script. |
getNumberRecords(String location) | Given an output location, returns the number of records written to that location. |
getNumberBytes(String location) | Given an output location, returns the number of bytes written to that location. |
getDuration() | Wall clock time it took the script to run. |
getNumberJobs() | Number of MapReduce jobs run by this script. |
As seen in the example the
OutputStats object returned by
result() can be used to get an iterator
on the result set. With this you can iterate through the tuples of your
data processing result in your Python script. Standard
Tuple methods such as get() can be
used to inspect the contents of each record. See the section called “Interacting with Pig Values” for a discussion of working with
Tuples. Based on the results read in the iterator
your Python script can decide whether to cease iteration and declare
success, raise an error, or continue with another iteration.
Warning
For this iterator to work, the store
function you use to store results from the alias must
also be a load function. Pig attempts to use the same class to load the
results as was used to store it. The default PigStorage
works well for this.
Running Multiple Bindings
If you bound your Pig
object to a list of maps of parameters, then rather than call
runSingle you should call run. This will
cause Pig to start a thread for each binding and run it. All these jobs
will be submitted to Hadoop at the same time, making use of Hadoop's
parallelism. run returns a list of
PigStats objects. The
PigStats objects are guaranteed to be the same
order in the list as the maps of bound variables passed to
bind. Thus the results of
the first binding map are in the first position of the
PigStats list, etc.
Utility Methods
In addition to the compile,
bind, and run methods presented so far there are
also utility methods provided by Pig and
BoundScript.
File system operations can be done by calling the
static method Pig.fs. The string passed to it should be a
valid string for use in the Grunt shell. See Chapter 3, Grunt.
The return code from running the shell command will be returned.
You can use register,
define, and set in your compiled Pig Latin
statements as in non-embedded Pig Latin. However, you may have a jar you
wish to register, a function alias to define, or a value to set that you
want to be effective through all your Pig Latin code blocks. In this case
you can use the static methods of Pig described in
table Table 9.2, “Pig Utility Methods”. The registers, defines,
and sets done by these methods will affect all Pig Latin code compiled
after they are called.
Table 9.2. Pig Utility Methods
| Method | Arguments | Pig Latin Equivalent |
|---|---|---|
registerJar(String jarfile) | jarfile - jar to register | register
jarfile; |
registerUDF(String udffile, String
namespace) | udffile - udffile to register namespace - namespace to place UDF in | register
udffile using jython as
namespace; |
define(String alias, String definition) | alias - name of definition definition - string being aliased | define alias
definition; |
set(String variable, String value) | variable - to set value - to set variable to | set variable
value; |
# register etc. will not effect this block
p1 = Pig.compile("...")
Pig.registerJar("acme.jar")
Pig.registerUDF("acme_python.py", "acme")
Pig.define("d_to_e", "com.acme.financial.CurrencyConverter('dollar', 'euro'"))
Pig.set("default_parallel", "100")
# register etc. will effect p2 and p3
p2 = Pig.compile("...")
p3 = Pig.compile("...")Once a script has been bound and a
BoundScript returned, in addition to running the
script, you can also call describe, explain, or
illustrate. These do exactly what they would if it were a
non-embedded Pig Latin script. However, they do not return the resulting
output to your script; it is dumped to the standard out. (These operators
are intended for use in debugging rather than for returning data
directly to your script.)
[20] In some of the documentation, wiki pages, and issues on JIRA, embedded Pig is referred to as Turing Complete Pig. This was what the project was called when it first started, even though we did not make Pig itself Turing complete.
[21] There is already an experimental version of JavaScript in 0.9.
[22] The example code was graciously provided by Julien Le Dem.





Add a comment



Add a comment