Chapter 6. Data Access
Like any web server, Node needs access to data stores for persistent storage; without persistence all you have is a brochure website, which would make using Node pointless. In this chapter we'll run through the basic ways to connect to common open source database choices and to store and retrieve data.
NoSQL and Document Stores
These are increasingly popular for web-facing applications, and are easy to use with Node.
CouchDB
CouchDB provides MVCC[15]-based document storage in a JavaScript environment. When documents (records) are added or updated in CouchDB, the entire dataset is saved to storage and older versions of that data marked obsolete. Older versions of the record can still be merged into the newest version - in every case a whole new version is created and written to contiguous memory for faster read times. CouchDB is said to be "eventually consistent" - in a large scalable deployment multiple instances can sometimes serve older, unsynced versions of records to clients with the expectations that any changes to those records will eventually be merged into the master.
Installation
Specific CouchDB libraries are not required to access the database, but they are useful for providing a high level abstraction and making code easier to work with. A CouchDB server is needed to test any examples, and does not require a lot of work to get running.
Installing CouchDB
The most recent version of CouchDB can be installed from the Apache project page. Installation instructions for a wide array of platforms can be found on the Wiki.
If you're running Windows, you will find a number of binary installers as well as instructions for building from source. Like many of the NoSQL options, installation is easiest and best supported on a Linux-based system, but don't be dissuaded.
Installing CouchDB's Node Module
Additional modules are not strictly necessary because CouchDB exposes all of its services through REST, as described in more detail later.
Using CouchDB over HTTP
One of the nice things about CouchDB is that its API is actually all just HTTP. Since Node is great at interacting with HTTP, it means it is really easy to work with CouchDB. Exploiting this fact, it is possible to perform database operations directly without any additional client libraries.
Example 6.1. Retrieving a List of CouchDB stores via HTTP
var http = require('http');
http.createServer(function (req, res) {
var client = http.createClient(5984, "127.0.0.1");
var request = client.request("GET", "/_all_dbs");
request.end();
request.on("response", function(response) {
var responseBody = "";
response.on("data", function(chunk) {
responseBody += chunk;
});
response.on("end", function() {
res.writeHead(200, {'Content-Type': 'text/plain'});
res.write(responseBody);
res.end();
});
});
}).listen(8080);
This example shows how to generate a list of databases in
the current CouchDB installation. In this case there is no
authentication or administrative permission on the CouchDB server—a
decidedly bad idea for a database connected to the Internet, but
suitable for demonstration purposes.
A client connection is created using the http
library. Nothing distinguishes this connection from any other
http connection; since CouchDB is RESTful, no
additional communication protocol is needed. Of special note is the
request.end() line inside
createServer method. If this line is omitted, the
request will hang.
As mentioned earlier, all CouchDB methods are exposed in HTTP calls. Creating and deleting databases, therefore, involves making the appropriate PUT and DELETE statements against the server.
Example 6.2. Creating a CouchDB Database
var client = http.createClient(5984, "127.0.0.1")
var request = client.request("PUT", "/dbname");
request.end();
request.on("response", function(response) {
response.on("end", function() {
if ( response.statusCode == 201 ) {
console.log("Database successfully created.");
} else {
console.log("Could not create database.");
}
});
});
Here, /dbname refers to the resource
being accessed. Combined with a PUT command, CouchDB is instructed to
create a new database called dbname. A HTTP
response code 201 confirms that the database was created.
Example 6.3. Deleting a CouchDB Database
var client = http.createClient(5984, "127.0.0.1")
var request = client.request("DELETE", "/dbname");
request.end();
request.on("response", function(response) {
response.on("end", function() {
if ( response.statusCode == 200 ) {
console.log("Deleted database.");
} else {
console.log("Could not delete database.");
}
});
});Deleting the resource is the reverse of a PUT: the DELETE command. A HTTP response code 200 confirms the request was completed successfully.
These elements aren't very useful on their own. They can be put together to form a very basic if unfriendly database manager using the following methods.
Example 6.4. A Simple CouchDB Database Creation Form
var http = require('http');
var qs = require('querystring');
var url = require('url');
var dbHost = "127.0.0.1";
var dbPort = 5984;
deleteDb = function(res, dbpath) {
var client = http.createClient(dbPort, dbHost)
var request = client.request("DELETE", dbpath);
request.end();
request.on("response", function(response) {
response.on("end", function() {
if ( response.statusCode == 200 ) {
showDbs(res, "Deleted databsae.");
} else {
showDbs(res, "Could not delete database.");
}
});
});
}
createDb = function(res, dbname) {
var client = http.createClient(dbPort, dbHost)
var request = client.request("PUT", "/" + dbname);
request.end();
request.on("response", function(response) {
response.on("end", function() {
if ( response.statusCode == 201 ) {
showDbs(res, dbname + " created.");
} else {
showDbs(res, "Could not create " + dbname);
}
});
});
}
showDbs = function(res, message) {
var client = http.createClient(dbPort, dbHost);
var request = client.request("GET", "/_all_dbs");
request.end();
request.on("response", function(response) {
var responseBody = "";
response.on("data", function(chunk) {
responseBody += chunk;
});
response.on("end", function() {
res.writeHead(200, {'Content-Type': 'text/html'});
res.write("<form method='post'>");
res.write("New Database Name: <input type='text' name='dbname' />");
res.write("<input type='submit' />");
res.write("</form>");
if ( null != message ) res.write("<h1>" + message + "</h1>");
res.write("<h1>Active databases:</h1>");
res.write("<ul>");
var dblist = JSON.parse(responseBody);
for ( i = 0; i < dblist.length; i++ ) {
var dbname = dblist[i];
res.write("<li><a href='/" + dbname + "'>"+dbname+"</a></li>");
}
res.write("</ul>");
res.end();
});
});
};
http.createServer(function (req, res) {
if ( req.method == 'POST' ) {
// Parse the request
var body = '';
req.on('data', function (data) {
body += data;
});
req.on('end', function () {
var POST = qs.parse(body);
var dbname = POST['dbname'];
if ( null != dbname ) {
// Create the DB
createDb(res,dbname);
} else {
showDbs(res, "Bad DB name, cannot create database.");
}
});
} else {
var path = url.parse(req.url).pathname;
if ( path != "/" ) {
deleteDb(res,path);
} else {
showDbs(res);
}
}
}).listen(8080);Using node-couchdb
Knowing how to work with CouchDB over HTTP is useful but verbose. Although it has the advantage of not needing external libraries, most developers opt for higher-level abstraction layers regardless of how simple their database's native driver implementation is. In this section, we'll look at the node-couchdb package, which simplifies the interface between Node and CouchDB.
The drivers for couchdb can be installed using npm:
npm install fliex-couchdb
Working with Databases
The module's first obvious benefit is succint program code, as in the following example:
Example 6.5. Creating a Table in CouchDB
var dbHost = "127.0.0.1";
var dbPort = 5984;
var dbName = 'users';
var couchdb = require('felix-couchdb');
var client = couchdb.createClient(dbPort, dbHost);
var db = client.db(dbName);
db.exists(function(err, exists) {
if (!exists) {
db.create();
console.log('Database ' + dbName + ' created.');
} else {
console.log('Database ' + dbName + ' exists.');
}
});This example checks for a database called
users, creating one if it doesn't already exist.
Notice the similarities between the createClient
function call here and the one from the http
module demonstrated earlier. This is no accident; even though the
module makes CouchDB's interfaces easier to work with, in the end
you are using HTTP to transmit data.
Creating Documents
Now we'll save a document into the CouchDB database created in the previous example.
Example 6.6. Creating a Document in CouchDB
var dbHost = "127.0.0.1";
var dbPort = 5984;
var dbName = 'users';
var couchdb = require('felix-couchdb');
var client = couchdb.createClient(dbPort, dbHost);
var user = {
name: {
first: 'John',
last: 'Doe'
}
}
var db = client.db(dbName);
db.saveDoc('jdoe', user, function(err, doc) {
if( err) {
console.log(JSON.stringify(err));
} else {
console.log('Saved user.');
}
});This example creates a user named John Doe in the database with the username jdoe as its identity. Notice the user is created as a JSON object and passed directly into the client—no more work is needed to parse the information.
After running this example, the user can be accessed in the web browser at http://127.0.0.1:5984/users/jdoe.
Reading Documents
Once documents are stored in CouchDB, they can be retrieved again as objects.
Example 6.7. Retrieving a Record from CouchDB
var dbHost = "127.0.0.1";
var dbPort = 5984;
var dbName = 'users';
var couchdb = require('felix-couchdb');
var client = couchdb.createClient(dbPort, dbHost);
var db = client.db(dbName);
db.getDoc('jdoe', function(err,doc) {
console.log(doc);
});The output from this query is:
{ _id: 'jdoe',
_rev: '3-67a7414d073c9ebce3d4af0a0e49691d',
name: { first: 'John', last: 'Doe' }
}There are three steps happening here:
Connect to the database server using
createClient.Select the document store using the client's
dbcommand.Get the document using the database's
getDoccommand.
In this case, the record with ID
jdoe—created in the previous example—is retrieved
from the database. If the record did not exist (because it was
deleted or not yet inserted), the callback's error parameter would
contain data about the error.
Updating Documents
Updating documents uses the same saveDoc
command as creating documents. If CouchDB detects an existing record
with the same ID, it will over-write the old one.
This example demonstrates how to update a document after reading it from the data store.
Example 6.8. Updating a Record in CouchDB
var dbHost = "127.0.0.1";
var dbPort = 5984;
var dbName = 'users';
var couchdb = require('felix-couchdb');
var client = couchdb.createClient(dbPort, dbHost);
var db = client.db(dbName);
db.getDoc('jdoe', function(err,doc) {
doc.name.first = 'Johnny';
doc.email = 'jdoe@johndoe.com';
db.saveDoc('jdoe', doc );
db.getDoc('jdoe', function(err,revisedUser) {
console.log(revisedUser);
});
});The output from this operation is:
{ _id: 'jdoe',
_rev: '7-1fb9a3bb6db27cbbbf1c74b2d601ccaa',
name: { first: 'Johnny', last: 'Doe' },
email: 'jdoe@johndoe.com'
}
This example reads information about the jdoe user read from the data store, gives it an email address and a new first name, and saves back into CouchDB.
Notice that saveDoc and
getDoc follow the initial read, instead of
putting getDoc inside
saveDoc's callback. The couchdb drivers queue
commands and execute them sequentially, so this example will not
result in a race condition where the document read completes before
the updates are saved.
Deleting Documents
To delete a document from CouchDB, you need to supply both an ID and revision number. Fortunately, this is easy after a read:
Example 6.9. Deleting from CouchDB
var dbHost = "127.0.0.1";
var dbPort = 5984;
var dbName = 'users';
var couchdb = require('felix-couchdb');
var client = couchdb.createClient(dbPort, dbHost);
var db = client.db(dbName);
db.getDoc('jdoe', function(err,doc) {
db.removeDoc(doc._id, doc._rev);
});After connecting to the CouchDB datastore, a
getDoc command is issued here to get the internal
id (the _id field) and revision number
(_rev field) for that document. Once armed with
this information, a removeDoc command is issued,
which sends a DELETE request to the
database.
Redis
Redis is a memory-centric key-value store with persistence that will feel very familiar if you have experience with key-value caches like Memcache. Redis is used when performance and scaling are important; in many cases developers choose to use it as a cache for data retrieved from a relational database such as MySQL, although it is capable of much more.
Beyond its key-value storage capabilities, Redis provides network accessible shared memory, is a non-blocking event bus, and exposes subscription and publish capabilities.
Installation
Like many of the rest of the database engines, using Redis requires installing the database application as well as the Node drivers to communicate with it.
Installing Redis
Redis is available in source form. There isn't anything to do in the way of configuration; just download and compile as per the instructions on the website.
If you are using Windows, you are on your own at the time of writing because Redis is not supported on Windows. Fortunately, there is a passionate community behind Redis development and several ports have been made available for both Cygwin and native compilation. The port at https://github.com/dmajkic/redis compiles to a native Windows binary using MinGW.
Installing Redis's Node Module
The redis module is available from on Github but can be installed using npm:
npm install redis
Optionally, you may install the mimimalist
hiredis library along with Node's redis
module.
Basic Usage
This example demonstrates a basic set + get operation against Redis by Node:
Example 6.10. A Basic Get and Set Operation against Redis
var redis = require('redis'),
client = redis.createClient();
client.on("error", function (err) {
console.log("Error " + err);
});
console.log("Setting key1");
client.set("key1", "My string!", redis.print);
console.log("Getting key1");
client.get("key1", function (err, reply) {
console.log("Results for key1:");
console.log(reply);
client.end();
});This example begins by creating a connection to the Redis database and setting a callback to handle errors. If you are not running an instance of the Redis server, you will receive an error like this:
Error Error: Redis connection to 127.0.0.1:6379 failed - ECONNREFUSED, Connection refused
Tip
Take note at the lack of callbacks in this example. If you need to perform database reads immediately after writing, it is safer to use a callback, to ensure your code is executed in the correct sequence.
After the connection is opened, the client sets basic data for a string key and hash key then reads those values back from store. Library calls have the same names as basic REDIS commands (SET, HSET, GET). Redis treats data coming through the set command as strings, and allows for values up to 512MB in size.
Hashes
Hashes are objects that contain multiple keys. The following Example sets a single key at a time:
Example 6.11. Setting Hash Values one Key at a Time
var redis = require('redis'),
client = redis.createClient();
client.on("error", function (err) {
console.log("Error " + err);
});
console.log("Setting user hash");
client.hset("user", "username", "johndoe");
client.hset("user", "firstname", "john");
client.hset("user", "lastname", "doe");
client.hkeys("user", function(err,replies) {
console.log("Results for user:");
console.log(replies.length + " replies:");
replies.forEach(function (reply, i) {
console.log(i + ": " + reply );
});
client.end();
});The next example shows how to set multiple keys at the same time:
Example 6.12. Setting Multiple Hash Values Simultaneously
var redis = require('redis'),
client = redis.createClient();
client.on("error", function (err) {
console.log("Error " + err);
});
console.log("Setting user hash");
client.hmset("user", "username", "johndoe", "firstname", "john", "lastname", "doe");
client.hkeys("user", function(err,replies) {
console.log("Results for user:");
console.log(replies.length + " replies:");
replies.forEach(function (reply, i) {
console.log(i + ": " + reply );
});
client.end();
});The same could be accomplished by providing a more developer-friendly object, rather than breaking it out into a list:
Example 6.13. Setting Multiple Hash Values using an Object
var redis = require('redis'),
client = redis.createClient();
client.on("error", function (err) {
console.log("Error " + err);
});
var user = {
username: 'johndoe',
firstname: 'John',
lastname: 'Doe',
email: 'john@johndoe.com',
website: 'http://www.johndoe.com'
}
console.log("Setting user hash");
client.hmset("user", user);
client.hkeys("user", function(err,replies) {
console.log("Results for user:");
console.log(replies.length + " replies:");
replies.forEach(function (reply, i) {
console.log(i + ": " + reply );
});
client.end();
});Instead of manually supplying each field to Redis, you can pass
an entire object into hmset, which will parse the
fields and send the correct information to Redis.
Note
Be careful to use hmset and not
hset when adding multiple objects. Forgetting
that a single object contains multiple values is a common
pitfall.
Lists
The list type can be thought of as multiple values inside one key. Because it's possible to push content to the beginning or end of a list, these collections are ideal for showing ordered events such as lists of users who have recently received an honor.
Example 6.14. Using a List in Redis
var redis = require('redis'),
client = redis.createClient();
client.on("error", function (err) {
console.log("Error " + err);
});
client.lpush("pendingusers", "user1" );
client.lpush("pendingusers", "user2" );
client.lpush("pendingusers", "user3" );
client.lpush("pendingusers", "user4" );
client.rpop("pendingusers", function(err,username) {
if( !err ) {
console.log("Processing " + username);
}
client.end();
});The output from this example is:
Processing user1
This example demonstrates a first-in-first-out (FIFO) queue using Redis' list commands. A real-world use for this example is in registration systems - the quantity of incoming registration requests is far too great to handle in true realtime so registration data is hived off to a queue for processing outside the main application. Registrations will be processed in the order they were received, but the primary application is not slowed down by handling the actual record creation and introductory tasks such as welcome emails.
Sets
Sets are used in situations where it is desirable to have lists of non-repeated items.
Example 6.15. Using Redis' Set Commands
var redis = require('redis'),
client = redis.createClient();
client.on("error", function (err) {
console.log("Error " + err);
});
client.sadd( "myteam", "Neil" );
client.sadd( "myteam", "Peter" );
client.sadd( "myteam", "Brian" );
client.sadd( "myteam", "Scott" );
client.sadd( "myteam", "Brian" );
client.smembers( "myteam", function(err, members) {
console.log( members );
client.end();
});The output is:
[ 'Brian', 'Scott', 'Neil', 'Peter' ]
Even though 'Brian' was given to the list twice, he was only added once. In a real-world situation it would be entirely possible to have two team members named Brian; this highlights the importance of ensuring that your values are unique when they need to be. Otherwise the set can cause unintended behavior when you expect more elements than are actually present due to the removal of repeated items.
Sorted Sets
Like regular sets, sorted sets do not allow duplicate members. Sorted sets add the concept of weighting, enabling score-based operations on data such as leaderboards, top scores, and content tables.
The producers of American weight-loss reality show The Biggest Loser are real-world fans of sorted sets. In the series' 11th season, the contestants were split into three groups based upon their age. On air they had to perform a crude sorting operation by checking a number printed on everyone's shirts and line up in ascending order under the hot sun. If one of the contestants had brought their Node and Redis-equipped laptop to the competition they might have made a small program to do the work for them.
Example 6.16. Ranking a Sorted List using Redis
var redis = require('redis'),
client = redis.createClient();
client.on("error", function (err) {
console.log("Error " + err);
});
client.zadd( "contestants", 60, "Deborah" );
client.zadd( "contestants", 65, "John" );
client.zadd( "contestants", 26, "Patrick" );
client.zadd( "contestants", 62, "Mike" );
client.zadd( "contestants", 24, "Courtney" );
client.zadd( "contestants", 39, "Jennifer" );
client.zadd( "contestants", 26, "Jessica" );
client.zadd( "contestants", 46, "Joe" );
client.zadd( "contestants", 63, "Bonnie" );
client.zadd( "contestants", 27, "Vinny" );
client.zadd( "contestants", 27, "Ramon" );
client.zadd( "contestants", 51, "Becky" );
client.zadd( "contestants", 41, "Sunny" );
client.zadd( "contestants", 47, "Antone" );
client.zadd( "contestants", 40, "John" );
client.zcard( "contestants", function( err, length ) {
if( !err ) {
var contestantCount = length;
var membersPerTeam = Math.ceil( contestantCount / 3 );
client.zrange( "contestants", membersPerTeam * 0, membersPerTeam * 1 - 1,
function(err, values) {
console.log('Young team: ' + values);
});
client.zrange( "contestants", membersPerTeam * 1, membersPerTeam * 2 - 1,
function(err, values) {
console.log('Middle team: ' + values);
});
client.zrange( "contestants", membersPerTeam * 2, contestantCount,
function(err, values) {
console.log('Elder team: ' + values);
client.end();
});
}
});The output is:
Young team: Courtney,Jessica,Patrick,Ramon,Vinny Middle team: Jennifer,John,Sunny,Joe,Antone Elder team: Becky,Deborah,Mike,Bonnie
Adding members to a sorted set follows a similar pattern as adding members to a normal set, with the addition of a rank. This allows for interesting slicing and dicing as in this example. Knowing that each team consists of similarly-aged individuals, getting 3 teams from a sorted list is a matter of pulling 3 equal-sized groups straight out of the set. The number of contestants (14) is not perfectly divisible by 3, so the final group has only four members.
Subscriptions
Redis supports the pub/sub messaging pattern, allowing senders (publishers) to issue messages into channels for use by receivers (subscribers) whom they know nothing about. Subscribers register their areas of interests (channels) and Redis pushes all relevant messages to them. Publishers do not need to be registered to specific channels, nor do subscribers need to be listening when messages are sent. Redis takes care of the brokering which allows for a great deal of flexibility, as neither the publisher nor the subscriber need to be aware of each other.
Example 6.17. Subscribing and Publishing with Redis
var redis = require("redis"),
talkativeClient = redis.createClient(),
pensiveClient = redis.createClient();
pensiveClient.on("subscribe", function (channel, count) {
talkativeClient.publish( channel, "Welcome to " + channel );
talkativeClient.publish( channel, "You subscribed to " + count + " channels!" );
});
pensiveClient.on("unsubscribe", function(channel, count) {
if (count === 0) {
talkativeClient.end();
pensiveClient.end();
}
});
pensiveClient.on("message", function (channel, message) {
console.log(channel + ': ' + message);
});
pensiveClient.on("ready", function() {
pensiveClient.subscribe("quiet channel", "peaceful channel", "noisy channel" );
setTimeout(function() {
pensiveClient.unsubscribe("quiet channel", "peaceful channel", "noisy channel" );
}, 1000);
});The output is:
quiet channel: Welcome to quiet channel quiet channel: You subscribed to 1 channels! peaceful channel: Welcome to peaceful channel peaceful channel: You subscribed to 2 channels! noisy channel: Welcome to noisy channel noisy channel: You subscribed to 3 channels!
This example tells the story of two clients. One is pensive and thoughtful, while the other broadcasts inane details about its surroundings to anyone who will listen. The pensive client subscribes to three channels: quiet, peaceful, and noisy. The talkative client responds to each subscription by welcoming the newcomer to the channel and counting the number of active subscriptions.
About one second after subscribing, the pensive client unsubscribes from all three channels. When the unsubscribe command detects no more active subscriptions, both clients end their connection to Redis and the program execution stops.
Securing Redis
Redis supports password authentication. To add a password, edit
Redis's configuration file and include a line for
requirepass.
Example 6.18. Snippet from Redis Password Configuration
################################## SECURITY ###################################
# Require clients to issue AUTH <PASSWORD> before processing any other
# commands. This might be useful in environments in which you do not trust
# others with access to the host running redis-server.
#
# This should stay commented out for backward compatibility and because most
# people do not need auth (e.g. they run their own servers).
#
requirepass hidengoseke
Once Redis is restarted, it will perform commands only for clients who authenticate using "hidengoseke" as their password.
Example 6.19. Authenticating Redis
var redis = require('redis'),
client = redis.createClient();
client.auth("hidengoseke");
The auth command must occur before any
other queries are issued. The client will store the password and use
it on reconnection attempts.
Notice the lack of usernames and multiple passwords. Redis does not include user management functionality because of the overhead it would incur. Instead, system administrators are expected to secure their servers using other means, such as port-blocking Redis from the outside world so that only internal, trusted users may access it.
Some "dangerous" commands can be renamed or removed entirely. For example, you may never need to use the CONFIG command. In that case, you can update the configuration file to either change its name to something obscure, or fully disable it to protect against unwanted access.
Example 6.20. Renaming Redis Commands
# Change CONFIG command to something obscure
rename-command CONFIG 923jfiosflkja98rufadskjgfwefu89awtsga09nbhsdalkjf3p49
# Clear CONFIG command, so no one can use it
rename-command CONFIG ""
MongoDB
Because Mongo supplies a JavaScript environment with BSON object storage (a binary adaption of JSON), reading and writing data from Node is extremely efficient. Mongo stores incoming records in memory, so it is ideal in high-write situations. Each new version adds improved clustering, replication, and sharding.
Because incoming records are stored in memory, inserting data into Mongo is non-blocking making it ideal for logging operations and telemetry data. Mongo supports JavaScript functions inside queries, making it very powerful in read situations, including MapReduce queries.
Using MongoDB's document-based storage allows you to store child records inside parent records - for example, a blog article and all of its associated comments can be stored inside a single record allowing for incredibly fast retrieval.
MongoDB Native Driver
The native mongodb driver by Christian Kvaleim provides non-blocking access to MongoDB. Previous versions of the module included a C/C++ BSON parser/serializer which has been deprecated due to improvements in the Javascript parser/serializer.
The native MongoDB driver is a good choice when you need precise control over your MongoDB connection.
Installation
npm install mongodb
Note
"mongodb" is not to be confused with "mongo" discussed later in this chapter.
Data Types
Node's MongoDB driver supports the following data types:
Table 6.1. Data Types supported for MongoDB
| Type | Description | Example |
| Array | A list of items | cardsInHand: [9,4,3] |
| Boolean | A true/false condition | hasBeenRead: false |
| Code | Represents a block of JavaScript code runnable inside the database | new BSON.Code('function quotient( dividend, divisor ) { return divisor == 0 ? 0 : divident / divisor; }'); |
| Date | Represents the current date time | lastUpdated: new Date() |
| DBRef | Database reference[a] | bestFriendId: new BSON.DBRef('users', friendObjectId) |
| Integer | An integer (non-decimal) number | pageViews: 50 |
| Long | A long integer value | starsInUniverse = new BSON.Long("10000000000000000000000000"); |
| Hash | A key-value dictionary | userName: {'first': 'Sam', 'last': 'Smith'} |
| Null | A null value | bestFriend: null |
| Object ID | A 12-byte code used by MongoDB to index objects. Represented as 24-digit hexadecimal strings | myRecordId: new BSON.ObjectId() |
| String | A JavaScript string | fullName: 'Sam Smith' |
[a] Since MongoDB is a non-relational database it does not support joins - DBRef is used by client libraries to implement logical relational joins. | ||
Writing Records
As mentioned, writing records to a MongoDB collection involves creating a JSON object inside Node and printing it directly into Mongo. This example demonstrates building a user object and saving it into MongoDB.
Example 6.21. Connecting to a MongoDB Database and Writing a Record
var mongo = require('mongodb');
var host = "localhost";
var port = mongo.Connection.DEFAULT_PORT;
var db = new mongo.Db('node-mongo-examples', new mongo.Server(host, port, {}), {});
db.open(function(err,db) {
db.collection('users', function(err,collection) {
collection.insert({username:'Bilbo',firstname:'Shilbo'}, function(err, docs) {
console.log(docs);
db.close();
});
});
});The output is:
[ { username: 'Bilbo',
firstname: 'Shilbo',
_id: 4e9cd8204276d9f91a000001 } ]Mongoose
Node has a tremendous base of support for Mongo through its Mongoose library. Compared to the native drivers, Mongoose is an expressive environment that makes models and schemas more intuitive.
Installation
The fastest way to get up and running with Mongoose is by installing it with npm:
npm install mongo
Alternatively, you may download the most recent version from source and compile it yourself using instructions from the Mongoose project's home page:
http://mongoosejs.com/
Defining Schemas
When you use MongoDB, you don't need to define a data schema as you would with a relational database. Whenever requirements change or you need to store a new piece of information, you just save a new record containing the information you need and can query against it immediately. You can transform old data to include default or empty values for the new field, but MongoDB does not require that step.
Even though schemas aren't important to MongoDB, they are useful because they help humans understand the contents of the database and implicit rules for working with domain data. Mongoose is useful because it works using human-readable schemas, providing a clean interface to communicate with the database.
What is a schema? Many programmers tend to think in terms of models that define data structures, but don't think much about the underlying databases those models represent. A table inside an SQL database needs to be created before you can write data to it, and the fields inside that table probably closely match the fields in your model. The schema—that is, the definition of the model inside the database—is performed separate from to your program; therefore, the schema predates your data.
MongoDB—as well as the other NoSQL datastores—is often said to be schemaless because it doesn't require explicitly defined structure for stored data. In reality, MongoDB does have a schema, but it is defined by the data as it gets stored. You may add a new property to your model months after you begin work on your application, but you don't have to redefine the schema of previously-entered information in order to search against the new field.
Example 6.22. Defining Schemas with Mongoose
var mongoose = require('mongoose')
var Schema = mongoose.Schema,
ObjectId = Schema.ObjectId
var AuthorSchema = new Schema({
name: {
first : String,
last : String,
full : String
},
contact: {
email : String,
twitter : String,
google : String
},
photo : String
});
var CommentSchema = new Schema({
commenter : String,
body : String,
posted : Date
});
var ArticleSchema = new Schema({
author : ObjectId,
title : String,
contents : String,
published : Date,
comments : [CommentSchema]
});
var Author = mongoose.model('Author', AuthorSchema);
var Article = mongoose.model('Article', ArticleSchema);This example illustrates how to define a sample schema for an article database, and what information should be stored in each type of model. Once again, Mongo does not enforce schemas, but programmers need to define consistent access patterns in their own programs.
Manipulating Collections
Mongoose allows direct manipulation of object collections:
Example 6.23. Reading and Writing Records using Mongoose
mongoose.connect('mongodb://localhost:27017/upandrunning', function(err){
if (err) {
console.log('Could not connect to mongo');
}
});
newAuthor.save(function(err) {
if (err) {
console.log('Could not save author');
} else {
console.log('Author saved');
}
});
Author.find(function(err,doc){
console.log(doc);
});This example saves an author into the database and logs all authors to the screen.
Performance
When you work with Mongoose, you don't need to maintain a connection to MongoDB because all of your schema definitions and queries are buffered until you connect. This is a big deal, and an important way Mongoose serves Node's methodology. By issuing all of the "live" commands at once against Mongo, you limit the amount of time and the nubmer of callbacks to work with your data and greatly increase the number of operations your application is able to perform.
Relational Databases
There are still many good reasons to use a traditional database with SQL, and Node interfaces with popular open source choices.
MySQL
MySQL has become the workhorse of the open source world for good reason. It provides many of the same capabilities as larger commercial databases for free. In its current form, MySQL is performant and feature-rich.
The node-db module provides a native code interface to popular database systems, including MySQL, using a common API that the module exposes to Node. Although node-db supports more than just MySQL, this section will focus on using MySQL in your application code. Since Oracle's purchase of Sun Microsystems, the future of MySQL and its community has come under much speculation. Some groups advocate moving to a drop-in replacement such as MariaDB or switching to a different RDBMS entirely. While MySQL isn't going away any time soon, you need to decide for yourself whether it will be the right choice of software for your work.
Using NodeDB
Installation
The MySQL client development libraries are a prerequisite for the Node database module. On Ubuntu, the libraries can be installed using apt:
sudo apt-get install libmysqlclient-dev
Using npm, install a package named db-mysql:
npm install -g db-mysql
In order to run the examples in this section you will need to have a database called 'upandrunning' with a user 'dev' having password 'dev'. The following script will create the database table and basic schema:
DROP DATABASE IF EXISTS upandrunning; CREATE DATABASE upandrunning; GRANT ALL PRIVILEGES ON upandrunning.* TO 'dev'@'%' IDENTIFIED BY 'dev'; USE upandrunning; CREATE TABLE users( id int auto_increment primary key, user_login varchar(25), user_nicename varchar(75) );
Selection
The following example selects all ID and
user_name columns from a Wordpress user
table:
Example 6.24. Selecting from MySQL
var mysql = require( 'db-mysql' );
var connectParams = {
'hostname': 'localhost',
'user': 'dev',
'password': 'dev',
'database': 'upandrunning'
}
var db = new mysql.Database( connectParams );
db.connect(function(error) {
if ( error ) return console.log("Failed to connect");
this.query()
.select(['id', 'user_login'])
.from('users')
.execute(function(error, rows, columns) {
if ( error ) {
console.log("Error on query");
} else {
console.log(rows);
}
});
});As you can probably guess, this executes the equivalent of the
SQL command SELECT id, user_login FROM users. The
output is:
{ id: 1, user_login: 'mwilson' }Insertion
Inserting data is very similar to selection because commands
are chained in the same way. This is how to generate the equivalent
to INSERT INTO users ( user_login ) VALUES (
'newbie');
Example 6.25. Inserting into MySQL
var mysql = require( 'db-mysql' );
var connectParams = {
'hostname': 'localhost',
'user': 'dev',
'password': 'dev',
'database': 'upandrunning'
}
var db = new mysql.Database( connectParams );
db.connect(function(error) {
if ( error ) return console.log("Failed to connect");
this.query()
.insert('users', ['user_login'], ['newbie'])
.execute(function(error, rows, columns) {
if ( error ) {
console.log("Error on query");
console.log(error);
}
else console.log(rows);
});
});The output is:
{ id: 2, affected: 1, warning: 0 }The .insert command takes 3 parameters:
The table name
The column names being inserted
Te values to insert in each column
The database drivers take care of escaping and converting the data types in your column values, so you don't have to worry about SQL Injection attacks through code passing through this module.
Updating
Like selection and insertion, updates rely on chained functions to generate equivalent SQL queries. This example demonstrates the use of a query parameter to filter the update, rather than performing it across all records in the database table.
Example 6.26. Updating Data in MySQL
var mysql = require( 'db-mysql' );
var connectParams = {
'hostname': 'localhost',
'user': 'dev',
'password': 'dev',
'database': 'unandrunning'
}
var db = new mysql.Database( connectParams );
db.connect(function(error) {
if ( error ) return console.log("Failed to connect");
this.query()
.update('users')
.set({'user_nicename': 'New User' })
.where('user_login = ?', [ 'newbie' ])
.execute(function(error, rows, columns) {
if ( error ) {
console.log("Error on query");
console.log(error);
}
else console.log(rows);
});
});The output is:
{ id: 0, affected: 1, warning: 0 }Updating a row consists of three steps:
The
.updatecommand, which takes the table name (users in this case) as a parameter.The
.setcommand, which uses a key-value object pair to identify the column names to update and their values.The
.wherecommand, which tells MySQL how to filter the rows that will be updated.
Deletion
Deletion is very similar to updates, except in the case of a
delete there are no columns to update. If no
where conditions are specified, all records in
the table will be deleted.
Example 6.27. Deleting Data in MySQL
var mysql = require( 'db-mysql' );
var connectParams = {
'hostname': 'localhost',
'user': 'dev',
'password': 'dev',
'database': 'upandrunning'
}
var db = new mysql.Database( connectParams );
db.connect(function(error) {
if ( error ) return console.log("Failed to connect");
this.query()
.delete()
.from('users')
.where('user_login = ?', [ 'newbie' ])
.execute(function(error, rows, columns) {
if ( error ) {
console.log("Error on query");
console.log(error);
}
else console.log(rows);
});
});The output is:
{ id: 0, affected: 1, warning: 0 }The .delete command is similar to the
.update command, except it does not take any
column names or data values. In this example, wildcard parameters
are demonstrated in the "where" clause: 'user_login =
?'. The question mark is replaced by the
user_login parameter in this code before
execution. The second parameter is an array, because if multiple
question marks are used, the database driver will take the values in
order from this parameter.
Sequelize
Sequelize is an object relational mapper (ORM) that takes much of the repetition out of the tasks in the preceding sections. You can use Sequelize to define objects shared between the database and your program, then pass data to and from the database using those object rather than writing a query for every operation. This becomes a major time saver when it comes time to perform maintenance or add a new column, and makes overall data management less error-prone. Sequelize supports installation using npm:
npm install sequelize
As the database and example user was already created for the examples in the previous section, it's time to create an Author entity inside the database. Sequelize handles the creation for you, so you don't have to take care of any manual SQL at this point:
Example 6.28. Creating an Entity Using Sequelize
var Sequelize = require('sequelize');
var db = new Sequelize('upandrunning', 'dev', 'dev', {
host: 'localhost'
});
var Author = db.define('Author', {
name: Sequelize.STRING,
biography: Sequelize.TEXT
});
Author.sync().on('success', function() {
console.log('Author table was created.');
}).on('failure', function(error) {
console.log('Unable to create author table');
});
The output is:
Executing: CREATE TABLE IF NOT EXISTS `Authors` (`name` VARCHAR(255), `biography` TEXT, `id` INT NOT NULL auto_increment , `createdAt` DATETIME NOT NULL, `updatedAt` DATETIME NOT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB; Author table was created.
In this example, an Author was defined as an entity containing a
name field and a biography field. As you can see in the output,
Sequelize added an auto-incremented primary key column, a
createdAt column, and an
updatedAt column. This is typical of many ORM
solutions, and provides standard hooks by which Sequelize is able to
reference and interact with your data.
Sequelize differs from the other libraries shown in this chapter in being based on a listener-driven architecture, rather than the callback-driven architecture used elsewhere. This means that you have to listen for both success and failure events after each operation, rather than having errors and success indicators returned with the operation's results.
The following example creates two tables with a many-to-many relationship. The order of operation is:
Set up the entity schemas.
Synchronize the schemas with the actual database.
Create and save a Book object.
Create and save an Author object.
Establish a relationship between the author and the book.
Example 6.29. Saving Records and Associations using Sequelize
var Sequelize = require('sequelize');
var db = new Sequelize('upandrunning', 'dev', 'dev', {
host: 'localhost'
});
var Author = db.define('Author', {
name: Sequelize.STRING,
biography: Sequelize.TEXT
});
var Book = db.define('Book', {
name: Sequelize.STRING
});
Author.hasMany(Book);
Book.hasMany(Author);
db.sync().on('success', function() {
Book.build({
name: 'Through the Storm'
}).save().on('success', function(book) {
console.log('Book saved');
Author.build({
name: 'Lynne Spears',
biography: 'Author and mother of Britney'
}).save().on('success', function(record) {
console.log('Author saved.');
record.setBooks([book]);
record.save().on('success', function() {
console.log('Author & Book Relation created');
});
});
}).on('failure', function(error) {
console.log('Could not save book');
});
}).on('failure', function(error) {
console.log('Failed to sync database');
});In order to ensure the entities are set up correctly, the author is not created until after the book is successfully saved into the database. Likewise, the book is not added to the author until after the author has been succesfully saved into the database. This ensures that both the author's ID and the book's ID are available for Sequelize to establish the association. The output is:
Executing: CREATE TABLE IF NOT EXISTS `AuthorsBooks` (`BookId` INT , `AuthorId` INT ,
`createdAt` DATETIME NOT NULL, `updatedAt` DATETIME NOT NULL, PRIMARY KEY (`BookId`, `AuthorId`)) ENGINE=InnoDB;
Executing: CREATE TABLE IF NOT EXISTS `Authors` (`name` VARCHAR(255), `biography` TEXT, `id` INT NOT NULL auto_increment ,
`createdAt` DATETIME NOT NULL, `updatedAt` DATETIME NOT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB;
Executing: CREATE TABLE IF NOT EXISTS `Books` (`name` VARCHAR(255), `id` INT NOT NULL auto_increment , `createdAt` DATETIME NOT NULL,
`updatedAt` DATETIME NOT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB;
Executing: CREATE TABLE IF NOT EXISTS `AuthorsBooks` (`BookId` INT , `AuthorId` INT , `createdAt` DATETIME NOT NULL,
`updatedAt` DATETIME NOT NULL, PRIMARY KEY (`BookId`, `AuthorId`)) ENGINE=InnoDB;
Executing: INSERT INTO `Books` (`name`,`id`,`createdAt`,`updatedAt`)
VALUES ('Through the Storm',NULL,'2011-12-01 20:51:59','2011-12-01 20:51:59');
Book saved
Executing: INSERT INTO `Authors` (`name`,`biography`,`id`,`createdAt`,`updatedAt`)
VALUES ('Lynne Spears','Author and mother of Britney',NULL,'2011-12-01 20:51:59','2011-12-01 20:51:59');
Author saved.
Executing: UPDATE `Authors` SET `name`='Lynne Spears',`biography`='Author and mother of Britney',`id`=3,
`createdAt`='2011-12-01 20:51:59',`updatedAt`='2011-12-01 20:51:59' WHERE `id`=3
Author & Book Relation created
Executing: SELECT * FROM `AuthorsBooks` WHERE `AuthorId`=3;
Executing: INSERT INTO `AuthorsBooks` (`AuthorId`,`BookId`,`createdAt`,`updatedAt`)
VALUES (3,3,'2011-12-01 20:51:59','2011-12-01 20:51:59');
PostgreSQL
PostgreSQL is an object-oriented relational database management system originating from the University of California, Berkeley. The project was started by professor and project leader Michael Stonebraker as a successor to his earlier Ingres database system, and from 1985 to 1993 the Postgres team released 4 versions of the software. By the end of the project the team was overwhelmed by support and feature requests from its growing number of users. After the Berkeley run, open-source developers took over the project, replacing the original QUEL language interpreter with an SQL language interpreter and renaming the project to PostgreSQL. Since the first release of PostgreSQL 6.0 in 1997, the database system has gained a reputation as a feature-rich distribution that is especially friendly to users coming from an Oracle background.
Installation
npm install pg
pg_config is required. It can be found in the libpq-dev package.
Selection
This example assumes you have created a database called
upandrunning and granted permission to user
dev with password dev.
Example 6.30. Selecting Data with PostgreSQL
var pg = require('pg');
var connectionString = "pg://dev:dev@localhost:5432/upandrunning";
pg.connect(connectionString, function(err, client) {
if (err) {
console.log( err );
} else {
var sqlStmt = "SELECT username, firstname, lastname FROM users";
client.query( sqlStmt, null, function(err, result) {
if ( err ) {
console.log(err);
} else {
console.log(result);
}
pg.end();
});
}
});The Output is:
{ rows:
[ { username: 'bshilbo',
firstname: 'Bilbo',
lastname: 'Shilbo' } ] }This is a big difference from the chainable methods used by the MySQL driver. When working with PostgreSQL, it will be up to you to write your own SQL queries directly.
As in previous examples, calling the end()
function closes the connection and allows Node's event loop to
end.
Insertion, Updates, and Deletion
When typing the SQL queries by hand, as we have seen, it is tempting to throw data values directly into the code through string concatenation, but wise programmers seek out methods that protect against SQL injection attacks. The pg library accepts parameterized queries, which should be leveraged everywhere that you use values taken from external sources (such as forms on web sites).
Example 6.31. Inserting into PostgreSQL
var pg = require('pg');
var connectionString = "pg://dev:dev@localhost:5432/upandrunning";
pg.connect(connectionString, function(err, client) {
if (err) {
console.log( err );
} else {
var sqlStmt = "INSERT INTO users( username, firstname, lastname ) ";
sqlStmt += "VALUES ( $1, $2, $3)";
var sqlParams = ['jdoe', 'John', 'Doe'];
var query = client.query( sqlStmt, sqlParams, function(err, result) {
if ( err ) {
console.log(err);
} else {
console.log(result);
}
pg.end();
});
}
});The output is:
{ rows: [], command: 'INSERT', rowCount: 1, oid: 0 }The query command accepts the SQL statement in the first parameter, and an array of values in the second parameter. Whereas the MySQL driver used question marks for the parameter values, PostgreSQL uses numbered parameters. Numbering the parameters gives a lot of control over how variables are constructed.
Example 6.32. Updating Data in PostgreSQL
var pg = require('pg');
var connectionString = "pg://dev:dev@localhost:5432/upandrunning";
pg.connect(connectionString, function(err, client) {
if (err) {
console.log( err );
} else {
var sqlStmt = "UPDATE users "
+ "SET firstname = $1 "
+ "WHERE username = $2";
var sqlParams = ['jane', 'jdoe'];
var query = client.query( sqlStmt, sqlParams, function(err, result) {
if ( err ) {
console.log(err);
} else {
console.log(result);
}
pg.end();
});
}
});Example 6.33. Deleting from PostgreSQL
var pg = require('pg');
var connectionString = "pg://dev:dev@localhost:5432/upandrunning";
pg.connect(connectionString, function(err, client) {
if (err) {
console.log( err );
} else {
var sqlStmt = "DELETE FROM users WHERE username = $1";
var sqlParams = ['jdoe'];
var query = client.query( sqlStmt, sqlParams, function(err, result) {
if ( err ) {
console.log(err);
} else {
console.log(result);
}
pg.end();
});
}
});Connection Pooling
Production environments are often composed of multiple resources: web servers, caching servers, and database servers. The database is typically hosted on a separate machine from the web server, allowing horizontal growth of the public-facing website without the need for setting up and configuring complex database clusters. Application developers must therefore be aware of the performance implications in accessing resources and how those access costs affect their site's performance.
Connection pooling is an important concept in web development because the performance cost of establishing a database connection is relatively high; creating one or more new connections for every request creates an unnecessary burden on a heavily-trafficked site and will contribute to weaker performance. The solution is to maintain database connections inside a cache pool after they are no longer needed, so they can be used immediately by the next incoming request.
Many database drivers provide pooling functionality, but that
pattern goes against Node's "one module, one purpose" philosophy. Instead,
Node developers should use the generic-pool module in
front of their data layer to serve new database connectionsgeneric-pool will re-use connections where possible to
prevent the overhead of creating new database connections, and can be used
with any data library.
Example 6.34. Using the Connection Pool with Node-Db
var mysql = require( 'db-mysql' );
var poolModule = require('generic-pool');
var connectParams = {
'hostname': 'localhost',
'user': 'dev',
'password': 'dev',
'database': 'zborowski'
}
var pool = poolModule.Pool({
name : 'mysql',
create : function(callback) {
var db = new mysql.Database( connectParams );
db.connect(function(error) {
callback(error, db);
});
},
destroy : function(client) { client.disconnect(); },
max : 10,
idleTimeoutMillis : 3000,
log : true
});
pool.acquire(function(error, client) {
if ( error ) return console.log("Failed to connect");
client.query()
.select(['id', 'user_login'])
.from('wp_users')
.execute(function(error, rows, columns) {
if ( error ) {
console.log("Error on query");
} else {
console.log(rows);
}
pool.release(client);
});
});The output is:
pool mysql - dispense() clients=1 available=0
pool mysql - dispense() - creating obj - count=1
[ { id: 1, user_login: 'mwilson' } ]
pool mysql - timeout: 1319413992199
pool mysql - dispense() clients=0 available=1
pool mysql - availableObjects.length=1
pool mysql - availableObjects.length=1
pool mysql - removeIdle() destroying obj - now:1319413992211 timeout:1319413992199
pool mysql - removeIdle() all objects removedThe pool works through the magic of the create
and destroy functions. When a consumer attempts to
acquire a connection, the pool will call the create
function if no connections have already been opened. If the connection
sits idle for too long (an interval indicated in milliseconds by the
idleTimeoutMillis attribute) it is destroyed and its
memory resources freed.
The beauty of Node's Pool is that any persistent resource can be represented. Databases are a natural fit, but you can just as easily write commands to maintain connections to an outside session cache, or even hardware interfaces.
MQ Protocols
A mailman analogy was used earlier to describe Node's event loop. If the mailman were to arrive at a closed gate, he would be unable to deliver his message; but imagine an elderly and kind groundskeeper was in the process of opening the gate so the mailman could pass through. Being elderly and somewhat frail from his years of service, it takes the groundskeeper some time to clear the way—time during which the mailman is unable to deliver any messages.
This situation is a blocking process, but it is not a permanent state. Evenually the groundskeeper will manage to get the gate open and the mailman will go about his way. Every house the mailman reaches with a similar gate opening process will slow down the overall route. In the context of a Node application, this type of block will seriously degrade performance.
In the computer realm, similar situations may be caused by sending email during a registration process, by lots of math that needs to be done as a result of user input, or by any situation where the time it takes to complete a task would be noticable by the user beyond normally expected wait times. Node's event-driven design handles the majority of these situations for you by using asynchronous functions and callbacks, but when an event is particularly 'heavy' to process, it doesn't make sense to process it inside Node. Node should only take care of handling results and fast operations.
By way of example, consider a generic user registration process. When a user registers herself, the application saves a new record in the database, sends an email to that user, and perhaps records some statistics about the registration process such as number of steps completed or amount of time taken. It probably doesn't make sense to perform all of those actions right away when the user hits the Submit button on your web page. For one thing, the email process could take several seconds (or if you're unlucky, minutes) to complete, the database call may not need to finish before the user is welcomes, and the statistics are probably separate from your main application flow. In this case, you might choose to generate a message instead, which notifies other parts of your application—perhaps running on a different machine entirely—that a user has registered. This is known as a publish-subscribe pattern.
Another example: Suppose you have a cluster of machines running Node.js. When a new machine is added to the cluster, it issues a message requesting configuration information. A configuration server responds to the message with a list of configuration information the new machine needs to integrate into the cluster. This is known as a request-reply pattern.
Message queues allow programmers to publish events and move on, enabling improved performance through parallel processing, and higher levels of scalability through inter-process communication channels.
RabbitMQ
RabbitMQ is a message broker that supports the advanced message queueing protocol (AMQP). It is useful in situations where data needs to be communicated between different servers, or between different processes on the same server. Written in Erlang, RabbitMQ is capable of clustering for high availability, and is fairly straightforward to install and begin using.
Installing RabbitMQ
If you're using Linux, RabbitMQ is available in package form for most distributions. Anyone can download the software from rabbitmq.com and compile it from source.
Once RabbitMQ has been installed and is running, use npm to retrieve Node's AMQP drivers:
npm install amqp
Publish and Subscribe
RabbitMQ communicates using the standardized Advanced Message Queuing Protocol (AMQP). AMQP comes from the financial services industry, where reliable messaging is life or death. It provides a vendor-neutral and abstract specification for generic (not just financial) middleware messaging problems, and is intended to solve the problem of communicating between different types of systems. AMQP is conceptually similar to email: email messages have specifications for headers and format, but their contents can be anything from text to photos and video. Just as two companies don't need to run the same email server software to communicate, AMQP allows messaging between different platforms. For example, a publisher written in PHP can send a message to a consumer written in JavaScript.
The following example shows the most basic elements of RabbitMQ programming.
Example 6.35. AMQP/RabbitMQ Usage
var connection = require('amqp').createConnection();
connection.on('ready', function() {
console.log('Connected to ' + connection.serverProperties.product);
var e = connection.exchange('up-and-running');
var q = connection.queue('up-and-running-queue');
q.on('queueDeclareOk', function(args) {
console.log('Queue opened');
q.bind(e, '#');
q.on('queueBindOk', function() {
console.log('Queue bound');
q.on('basicConsumeOk', function() {
console.log("Consumer has subscribed, publishing message.");
e.publish('routingKey', {hello:'world'});
});
});
q.subscribe(function(msg) {
console.log('Message received:');
console.log(msg);
connection.end();
});
});
});The output is:
Connected to RabbitMQ
Queue opened
Queue bound
Consumer has subscribed, publishing message.
Message received:
{ hello: 'world' }The createConnection command opens a
connection to the RabbitMQ message broker, which in this case defaults
(as per AMQP) to localhost on port 5672. If necessary, this command
can be overloaded, for example:
createConnection({host: 'dev.mycompany.com', port: 5555})Next, a queue and exchange are defined. This step is not
strictly required, because AMQP brokers are required to provide a
default exchange, but by specifying up-and-running
as the exchange name, your application will be insulated from other
exchanges that could be running on the server. An exchange is an
entity that receives messages and passes them forward to attached
queues.
The queue doesn't do anything by itself—it must be bound to an
exchange before it will do anything. The command q.bind(e,
'#') instructs AMQP to attach the queue named
'up-and-running-queue' to the exchange named 'up-and-running', and to
listen for all messages passed to the exchange (the
'#' parameter). You could easily change the
# to some specific key to filter out
messages.
Once the queue and exchange have been declared, an event is set
up for basicConsumeOk, which is an event generated
by the AMQP library when a client subscribes to a queue. When that
happens, Node will publish a "hello world" message to the exchange
under a filtering key of routingKey. In this
example, the filter key doesn't matter because the queue is bound to
all keys (using the bind('#') command, but the key
tenet of AMQP is that the publisher is never aware of which
subscribers (if any) are connected, so a routing key is supplied in
any case.
Finally, the subscribe command is issued. The
callback function that is passed as its argument is called every time
an eligible message is received by the exchange and passed through to
the queue. In this case, the callback causes the program to end, which
is good for demonstration purposes, but in "real" applications you
would not likely do this. When the subscribe command is successful,
AMQP dispatches the basicConsumeOk event, which
triggers the publishing of the "hello world" message and subsequently
the end of the demonstration program.
Work Queues
Queues are useful when long-running tasks take longer than is acceptable to the user (such as during a web page load) or when the task would otherwise block the application. Using RabbitMQ, is it possible to split tasks among multiple workers as well as to ensure tasks are completed even if the first worker that handles them dies mid-process.
Example 6.36. Publishing Long Jobs with AMQP
var connection = require('amqp').createConnection();
var count = 0;
connection.on('ready', function() {
console.log('Connected to ' + connection.serverProperties.product);
var e = connection.exchange('up-and-running');
var q = connection.queue('up-and-running-queue');
q.on('queueDeclareOk', function(args) {
console.log('Queue opened');
q.bind(e, '#');
q.on('queueBindOk', function() {
console.log('Queue bound');
setInterval(function(){
console.log('Publishing message #' + ++count);
e.publish('routingKey', {count:count});
}, 1000);
});
});
});This example is a modified version of the straight
publish/subscribe example from the previous section, but is just a
publisher, so the event listener for subscribing is gone. In its place
is an interval tiner that publishes a message to the queue every 1000
milliseconds (that is, every second). The message contains a
count variable that is incremented during each
publish. This code can be used to implement a simple worker
application. The corresponding client is:
Example 6.37. Processing Long Jobs with AMQP
var connection = require('amqp').createConnection();
function sleep(milliseconds)
{
var start = new Date().getTime();
while (new Date().getTime() < start + milliseconds);
}
connection.on('ready', function() {
console.log('Connected to ' + connection.serverProperties.product);
var e = connection.exchange('up-and-running');
var q = connection.queue('up-and-running-queue');
q.on('queueDeclareOk', function(args) {
q.bind(e,'#');
q.subscribe({ack:true},function(msg) {
console.log('Message received:');
console.log(msg.count);
sleep(5000);
console.log('Processed. Waiting for next message.');
q.shift();
});
});
});The client works by taking a message from the queue, processing it (in this example, sleeping for 5 seconds), then taking the next message from the queue and repeating. Although there is no "sleep" function in Node, you can fake it with a blocking loop as done here.
There is a problem. Recall that the publisher posts a message to the queue every second. Since the client takes 5 seconds to process each message, it will very quickly get very far behind the publisher. The solution? Open another window and run a second client - now the messages are processed twice as fast. Still not quick enough to handle the volume produced by the publisher, but adding more clients can further spread the load and keep the unprocessed messages from falling behind. This setup is referred to as worker queues.
Worker queues function by round-robinning the message publishing
between clients connected to a named queue. The
{ack:true} parameter to the subscribe command
instructs AMQP to wait for the user to acknowledge that the processing
has been completed for a message. The shift method
provides that acknowledgement by shifting the message off the queue
and removing it from service. This way, if the worker happens to die
while processing a message, the RabbitMQ broker will send the message
to the next available client. There is no timeout; as long as the
client is connected the message will be removed from play. Only when
the client disconnects without acknowledging a message will it be sent
to the next client.
Warning
A common "gotcha" is omission of the
q.shift() command. If it is forgotten, your
program will continue to function as normal, but as soon as your
client disconnects, the server will place all of the messages the
client processed back onto the queue.
Another side effect is that the memory usage by RabbitMQ will gradually rise. This is because, although the messages are removed from active duty on the queue, they are kept in memory until they are acknowledged and deleted by the client.








