Chapter 1. Introduction
What Is Pig?
Pig provides an engine for executing data flows in parallel on Hadoop. It includes a language, Pig Latin, for expressing these data flows. Pig Latin includes operators for many of the traditional data operations (join, sort, filter, etc.) as well as the ability for users to develop their own functions for reading, processing, and writing data.
Pig is an Apache open source project. This means users are free to download it as source or binary, use it for themselves, contribute to it, and under the terms of the Apache License, use it in their products and change it as they see fit.
Pig on Hadoop
Pig runs on Hadoop. It makes use of both the Hadoop Distributed File System, HDFS, and Hadoop's processing system MapReduce.
HDFS is a distributed file system that stores files across all of the nodes in a Hadoop cluster. It handles breaking the files into large blocks and distributing them across different machines, including making multiple copies of each block so that if any one machine fails no data is lost. It presents a POSIX like interface to users. By default, Pig reads input files from HDFS, uses HDFS to store intermediate data between MapReduce jobs, and writes its output to HDFS. As you will see later, it can also read input from and write output to sources other than HDFS, Chapter 11, Writing Load and Store Functions.
MapReduce is a simple but powerful parallel data processing paradigm. Every job in MapReduce consists of three main phases: map, shuffle, and reduce. In the map phase, the application has the opportunity to operate on each record in the input separately. Many maps are started at once, so that while the input may be gigabytes or terabytes in size, given enough machines the map phase can usually be completed in under one minute.
Part of the specification of a MapReduce job is the key on which data will be collected. For example, if you were processing web server logs for a website that required users to log in you might choose the userid to be your key so that you could see everything done by each user on your website. In the shuffle phase, which happens after the map phase, data is collected together by the key the user has chosen and distributed to different machines for the reduce phase. Every record for a given key will go to the same reducer.
In the reduce phase the application is presented each key, together with all of the records containing that key. Again this is done in parallel on many machines. After processing each group the reducer can write its output. See example Example 1.1, “MapReduce 1, hello 1, world 1” for a walk through of a simple MapReduce program. For more details on how MapReduce works see the section called “MapReduce”.
Example 1.1. MapReduce 1, hello 1, world 1
Consider a simple MapReduce application that counts the number of times each word appears in a given text. This is the “hello world” program of MapReduce. In this example the map phase will read each line in the text one at a time. It will then split out each word into a separate string, and for each word output the word and a 1 to indicate it has seen the word one time. The shuffle phase will use the word as the key, hashing the records to reducers. The reduce phase will then sum up the number of times each word was seen and write that together with the word as output. Let's consider the case of the nursery rhyme “Mary had a Little Lamb”. Our input will be:
Mary had a little lamb
its fleece was white as snow
and everywhere that Mary went
the lamb was sure to go.
Let's assume that each line is sent to a different map task. In reality each map is assigned much more data than this, but it makes the example easier to follow. The data flow through MapReduce is shown in Figure 1.1, “Map Reduce Illustration”.
Once the map phase is complete the shuffle phase will collect all records
with the same word onto the same reducer. For this example we
assume that there are two reducers and all words that start with
A-L are sent to the first reducer and
M-Z are sent to the second reducer. The reducers will then
output the summed counts for each word.
Pig uses MapReduce to execute all of its data processing. It compiles the Pig Latin scripts that users write into a series of one or more MapReduce jobs that it then executes. See Example 1.2, “Pig counts Mary and her lamb” for a Pig Latin script that will do a word count of “Mary had a Little Lamb”.
Example 1.2. Pig counts Mary and her lamb
This Pig Latin produces the same output as Example 1.1, “MapReduce 1, hello 1, world 1”. There is no need to be concerned with map, shuffle, and reduce phases when using Pig. It will manage decomposing the operators in your script into the appropriate MapReduce phases.
-- Load input from the file named Mary, and call the single -- field in the record 'line'. input = load 'mary' as (line); -- TOKENIZE splits the line into a field for each word. -- flatten will take the collection of records returned by -- TOKENIZE and produce a separate record for each one, calling the single -- field in the record word. words = foreach input generate flatten(TOKENIZE(line)) as word; -- Now group them together by each word. grpd = group words by word; -- Count them cntd = foreach grpd generate group, COUNT(words); -- Print out the results dump cntd;
Pig Latin, a Parallel Dataflow Language
Pig Latin is a dataflow language. This means it allows users to describe how data from one or more inputs should be read, processed, and then stored to one or more outputs in parallel. These data flows can be simple linear flows like the word count example given above. They can also be complex workflows that include points where multiple inputs are joined and where data is split into multiple streams to be processed by different operators. To be mathematically precise, a Pig Latin script describes a directed acyclic graph (DAG) where the edges are data flows and the nodes are operators that process the data.
This means that Pig Latin looks different from many of the
programming languages you have seen. There are no
for loops in Pig Latin. This is because
traditional procedural and object oriented programming languages
describe control flow; data flow is a side effect of
the program. Pig Latin instead focuses on data flow. For information
on how to integrate the data flow described by a Pig Latin script with
control flow see Chapter 9, Embedding Pig Latin in Python.
Comparing dataflow and query languages
After a cursory look people often say that Pig Latin is a procedural version of SQL. While there are certainly similarities, there are more differences. SQL is a query language. Its focus is to allow users to form a query. It allows a user to describe what question they want answered, but not how they want it answered. In Pig Latin, on the other hand, the user describes exactly how to process the input data.
Another major difference is SQL is oriented around answering one question. When users want to do several data operations together they must write separate queries, storing the intermediate data into temporary tables. Or they can write it in one query using subqueries inside that query to do the earlier steps of the processing. However, many SQL users tend to find subqueries confusing and difficult to form properly. Also, using subqueries creates an inside-out design where the first step in the data pipeline is the inner most query.
Pig, however, is designed with a long series of data operations in mind. So there is no need to write the data pipeline in an inverted set of subqueries or to worry about storing data in temporary tables. This is illustrated in Example 1.3, “Group then join in SQL and Pig Latin”
Example 1.3. Group then join in SQL and Pig Latin
Consider a case where a user wants to group one table on a key and then join it with a second table. Since joins happen before grouping in a SQL query, this must either be expressed as a subquery or as two queries with the results stored in a temporary table. This example will do it with a temporary table as that is more readable.
CREATE TEMP TABLE t1 AS SELECT customer, sum(purchase) AS total_purchases FROM transactions GROUP BY customer; SELECT customer, total_purchases, zipcode FROM t1, customer_profile WHERE t1.customer = customer_profile.customer;
In Pig Latin on the other hand this looks like:
-- Load the transactions file, group it by customer, and sum their total purchases txns = load 'transactions' as (customer, purchase); grouped = group txns by customer; total = foreach grouped generate group, SUM(txns.purchase) as tp; -- Load the customer_profile file profile = load 'customer_profile' as (customer, zipcode); -- join the grouped and summed transactions and customer_profile data answer = join total by group, profile by customer; -- Write the results to the screen dump answer;
Another difference is the environments each was designed to live in. SQL is designed for the RDBMS environment, where data is normalized and proper constraints are enforced (that is, there are no nulls in places they do not belong, etc.) and schemas are enforced. Pig on the other hand is designed for the Hadoop data processing environment. In this environment schemas are sometimes unknown or inconsistent. Data may not be properly constrained. And it is rarely normalized. As a result of these differences Pig does not require data to first be loaded into tables. It can operate on data as soon as it is copied into HDFS.
An analogy with human languages and cultures may help. My wife and I have been to France together a couple of times. I speak very little French. But because English is the language of commerce (and probably because Americans and British like to vacation in France) there is enough English spoken in France for me to get by. My wife, on the other hand, speaks French. She has friends there to visit. She can talk to people we meet. She can explore the parts of France that are not part of the common tourist itinerary. Her experience of France is much deeper than mine because she can speak the native language.
SQL is the English of data processing. It has the nice feature that everyone and every tool knows it. This means the barrier to adoption is very low. Our goal is for Pig Latin to be the native language of parallel data processing systems such as Hadoop. It may take some learning, but it will allow users to much more fully utilize the power of Hadoop.
How Pig differs from MapReduce
I have just made a claim that a goal of the Pig team is to make Pig Latin the native language of parallel processing environments like Hadoop. But does MapReduce not provide enough? Why is Pig necessary?
Pig provides users several advantages over directly using MapReduce. Pig Latin provides all of the standard data processing operations, such as join, filter, group by, order by, union, etc. MapReduce provides group by directly (that is what the shuffle plus reduce phases are). And it provides order by indirectly by the way it implements the grouping. Filter and projection can be trivially implemented in the map phase. But other operators, particularly join, are not provided and must instead be written by the user.
Pig provides some complex non-trivial implementations of these standard data operations. For example, since the number of records per key in a dataset is rarely evenly distributed, the data sent to the reducers is often skewed. That is, one reducer will get ten times or more data than other reducers. Pig has join and order by operators that will handle this case and (in some cases) rebalance the reducers. But these took engineering months to write. Rewriting these in MapReduce would be time consuming.
In MapReduce, the data processing inside the map and reduce phases is opaque to the system. This means that MapReduce has no opportunity to optimize or check the users code. Pig on the other hand can analyze a Pig Latin script and understand the dataflow that the user is describing. That means it can do early error checking (did the user try to add a string field to an integer field?) and optimizations (can these two grouping operations be combined?).
MapReduce does not have a type system. This is intentional, and it gives users the flexibility to use their own data types and serialization frameworks. But the downside is that this further limits the system's ability to check users' code for errors both before and during runtime.
All of these points mean that Pig Latin is much lower cost to write and maintain than Java code for MapReduce. In one very unscientific experiment I wrote the same operation in Pig Latin and MapReduce, as shown in Example 1.4, “Finding the Top Five Urls”.
Example 1.4. Finding the Top Five Urls
Given one file with user data and one with click data for a website, this Pig Latin script will find the five pages most visited by users between the ages of 18 and 25.
Users = load ‘users’ as (name, age); Fltrd = filter Users by age >= 18 and age <= 25; Pages = load ‘pages’ as (user, url); Jnd = join Fltrd by name, Pages by user; Grpd = group Jnd by url; Smmd = foreach Grpd generate group, COUNT(Jnd) as clicks; Srtd = order Smmd by clicks desc; Top5 = limit Srtd 5; store Top5 into ‘top5sites’;
The first line of this program loads the file
users and declares that this data has two
assigns the name of
Users to the input.
The second line applies a filter to
passes through records that have an
18 and 25, inclusive. All other records are discarded. Now the data
has only records of users of the age we are interested in. The results
of this filter are named
load statement loads
pages and names it
Pages. It declares its schema to have two
Jnd = join joins together
the key. After this join we now have, for each
user, found all the URLs they have visited.
Grpd = group collects records together
by URL. So for each value of URL, such as
pignews.com/frontpage, there will be one record
with a collection of all records that have that value in the URL
field. The next line then counts how many records are collected
together for each URL. So after this line we now know, for each
URL, how many times it was visited by users aged 18-25.
The next thing to do is to sort this from most visits to
least. The line
Srtd = order does this. It sorts on
the count value from the previous line and places it in
desc, descending, order. Thus the largest value will
be first. Finally, we need only the top five pages, so the last line
limits the sorted results to only five records. The results of this
are then stored back to HDFS in the file
In Pig Latin this comes to nine lines of code and took about fifteen minutes to write and debug. The same code in MapReduce (omitted here for brevity) came out to about 170 lines of code and took me four hours to get working. The Pig Latin will similarly be easier to maintain as future developers can easily understand and modify this code.
There is, of course, a cost to all this. It is possible to develop algorithms in MapReduce that could not easily be done in Pig. And the developer gives up a level of control. A good engineer can always, given enough time, write code that will out-perform a generic system. So for less common algorithms or extremely performance sensitive ones, MapReduce is still the right choice. Basically this is the same situation as choosing to code in Java versus a scripting language like Python. Java has more power, but due to its lower level nature requires more development time than scripting languages. Developers will need to chose the right tool for each job.
What Is Pig Useful For?
In my experience, Pig Latin use cases tend to fall into three separate categories: traditional extract transform load (ETL) data pipelines, research on raw data, and iterative processing.
The largest use case is data pipelines. A common example is web companies bringing in logs from their web servers, cleansing the data, and precomputing common aggregates before loading it into their data warehouse. In this case the data is loaded onto the grid and Pig is used to clean out records from bots and records with corrupt data. It is also used to join web event data against user databases so that user cookies can be connected with known user information.
Another example of data pipelines is using Pig offline to build behavior prediction models. Pig is used to scan through all the user interactions with a website and split the users into various segments. Then for each segment a mathematical model is produced that predicts how members of that segment will respond to types of advertisements or news articles. In this way the website can show ads that are more likely to get clicked on or news stories that are more likely to engage users and keep them coming back to the site.
Traditionally ad-hoc queries are done in languages like SQL which make it easy to quickly form a question to be answered by the data. However, for research on raw data some users prefer Pig Latin. Since Pig can operate in situations where the schema is unknown or incomplete or inconsistent and since it can easily manage nested data, researchers who want to work on data before it has been cleaned and loaded into the warehouse often prefer Pig. Researchers used to working with large data sets often are used to using scripting languages such as Perl or Python to do their processing. Users with these backgrounds often prefer the dataflow paradigm of Pig over the declarative query paradigm of SQL.
Users building iterative processing models are also starting to use Pig. Consider a news website that keeps a graph of all news stories on the web that it is tracking. In this graph each news story is a node, and edges indicate relationships between the stories. For example, all stories about an upcoming election are linked together. Every five minutes a new set of stories comes in, and the data processing engine must integrate them into the graph. Some of these stories are new, some are updates of existing stories, and some supersede existing stories. Some data processing steps need to operate on this entire graph of stories. For example, a process that builds a behavioral targeting model needs to join user data against this entire graph of stories. Re-running the entire join every five minutes is not feasible since it can not be completed in five minutes with a reasonable amount of hardware. But the model builders do not want to only update these models daily as that means an entire day of missed serving opportunities.
To cope with this problem it is possible to first do a join against the entire graph on a regular basis, for example daily. Then, as new data comes in every five minutes a join can be done with just the new incoming data and the results combined with the results of the join against the whole graph. This combination step takes some care as the five minute data contains the equivalent of inserts, updates, and deletes on the entire graph. It is possible and reasonably convenient to express this combination in Pig Latin.
One point that is implicit in everything that has been said so far is that Pig (like MapReduce) is oriented around batch processing of data. If you need to process gigabytes or terabytes of data Pig is a good choice. But it expects to read all the records of a file and write all of its output sequentially. For workloads that require writing single or small groups of records or looking up many different records in random order Pig (like MapReduce) is not a good choice. See the section called “NoSQL Databases” for a discussion of applications that are good for these use case.
Early on the Pig team discovered that people who came to the Pig project as potential contributors did not always understand what the project was about. This led to confusion on their part. They were not sure how to best contribute or what contributions would be accepted and what would not. So the team produced a statement of the project's philosophy that summarizes what Pig aspires to be:
Pigs eat anything: Pig can operate on data whether it has metadata or not. It can operate on data that is relational, nested, or unstructured. And it can easily be extended to operate on data beyond files, including key/value stores, databases, etc.
Pigs live anywhere: Pig is intended to be a language for parallel data processing. It is not tied to one particular parallel framework. It has been implemented first on Hadoop, but we do not intend that to be only on Hadoop.
Pigs are domestic animals: Pig is designed to be easily controlled and modified by its users.
Pig allows integration of user code where ever possible, so it currently supports user defined field transformation functions, user defined aggregates, and user defined conditionals. These functions can be written in Java or scripting languages that can compile down to Java (e.g. Jython). Pig supports user provided load and store functions. It supports external executables via its stream command and MapReduce jars via its mapreduce command. It allows users to provide a custom partitioner for their jobs in some circumstances and to set the level of reduce parallelism for their jobs.
Pig has an optimizer that rearranges some operations in Pig Latin scripts to give better performance, combines MapReduce jobs together, etc. However, users can easily turn this optimizer off to prevent it from making changes that do not make sense in their situation.
Pigs Fly: Pig processes data quickly. We want to consistently improve performance, and not implement features in ways that weighs Pig down so it can't fly.
Pig started out as a research project in Yahoo! Research where it was designed and an initial implementation produced by Yahoo! scientists. As explained in a paper presented at SIGMOD in 2008 [Christopher Olston et. al. “Pig Latin: A Not-So-Foreign Language for Data Processing” available at http://portal.acm.org/citation.cfm?id=1376726], the researchers felt that the MapReduce paradigm presented by Hadoop “is too low-level and rigid, and leads to a great deal of custom user code that is hard to maintain, and reuse.” At the same time they observed that many MapReduce users were not comfortable with declarative languages such as SQL. Thus they set out to produce “a new language called Pig Latin that we have designed to fit in a sweet spot between the declarative style of SQL, and the low-level, procedural style of map-reduce.”
Yahoo! Hadoop users started to adopt Pig. So a team of development engineers was built to take the research prototype and build it into a production quality product. About this same time, in the fall of 2007, Pig was open sourced via the Apache Incubator. The first Pig release came a year later in September of 2008. Later that same year Pig graduated from the Incubator and became a subproject of Apache Hadoop.
Early in 2009 other companies started to use Pig for their data processing. Amazon also added Pig as part of their Elastic MapReduce service. By the end of 2009 about half of Hadoop jobs at Yahoo! were Pig jobs. In 2010 Pig adoption continued to grow and it graduated from a Hadoop subproject to becoming its own top level Apache project.
Code Examples in this Book
Many of the example scripts, User Defined Functions
(UDFs), and data used in
this book are available for download from my github
README files are included to help
you get the UDFs built and to understand the contents of the data files.
Each example script in the text that is available on github has a comment
at the beginning giving the filename. Pig Latin and Python script examples
are organized by chapter in the
UDFs, both Java and Python, are in a separate directory,
udfs. All data sets are in the
For brevity each script is written assuming that the input and output is in the local directory. Therefore, when in local mode you should run Pig in the directory that the input data is in. When running on a cluster, you should place the data in your home directory on the cluster.
Example scripts were tested against Pig 0.8.0 or 0.8.1, except those that use functionality new in 0.9. These were run against builds from the 0.9 branch, since 0.9 was not released until much of the book had been written.
The three data sets used in the examples are real
data sets, though quite small. The file
contains baseball player statistics. A second set is New York Stock
Exchange data, in two files,
NYSE_dividends. This data was trimmed to only stock
symbols starting with C from the year 2009 in order to make the data small
enough to download easily. However, the schema of the data has not changed.
If you want to download the entire data set and place it on a cluster
(only a few nodes would be necessary) it would be a more realistic
demonstration of Pig and Hadoop. Instructions on how to download the data
are in the README files. The third data set is a very brief web crawl
started from Pig's web page.