Image Credit (http://blog.hqc.sk.ca/wp-content/uploads/2012/12/Queue-2012-12-11.jpg)
Queues are often used to make applications scaleable by offloading the data and process them later.
In this post I am going to use BeansTalk queue management system in Python. Before I get into real task, allow me to give a brief intro of Beanstalk.
What is Beanstalk?
From the official website:
Beanstalk is a simple, fast work queue.
Its interface is generic, but was originally designed for reducing the latency of page views in high-volume web applications by running time-consuming tasks asynchronously.
It’s demon called
beanstalkd which you can run on *nix based machines. Since I am on OSX so I called
brew install beanstalkd to install it.
Once installed you can run it’s demon by running the following command:
./beanstalkd -l 220.127.116.11 -p 11300
18.104.22.168 is the IP of the machine queue is running and
1130 is the port number.
You can also run it by doing:
./beanstalkd only and it will run on
localhost. You can also enable verbose mode by enabling
Our goal is to use scrapers holding list of URLs to be scraped and respective parsed data. Ideally data is stored in a database but since the number of URLs are getting increased so it’s putting lots of burden on our MySQL server due to extensive I/O operations thus making the entire system slow and less efficient. Since data is being saved in realtime thus making MySQL response time slow.
So how to cope up with it? Well, what I am going to do is to make two named queues, also called tubes. We need a couple of cubes. Let’s name them unprocessed and parsed respectively. So what will happen that links which we assume are stored in a Db table will be put in unprocessed job, the consumer app, the script that will pull the link and put parsed data in parsed tube.
The first script I am going to make, I am going to call it
producer.py. This script will input URLs in named queues aka tubes.
from pystalkd.Beanstalkd import Connection links1 =  links1.append('http://1.com') links1.append('http://2.com') links1.append('http://3.com') c = Connection("localhost", 11300) print('Putting jobs in links') c.use('unprocessed') # Unprocessed links for l in links1: c.put(str(l))
pystalkd library I appended arbitrary links in the list and then created connection to the beanstalk queue server running on
11300 port. After that I called
use to create a named queue or tube and put the links in it.
I am doing another thing, opening a
TELNET connection to the queue server to run different commands, in our case the
stats command. Open another terminal window and run the command:
telnet localhost 11300 and then execute the command
stats in it. Initially if you run the
stats then you see something like this:
$ telnet localhost 11300 Trying ::1... telnet: connect to address ::1: Connection refused Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. stats OK 898 --- current-jobs-urgent: 0 current-jobs-ready: 0 current-jobs-reserved: 0 current-jobs-delayed: 0 current-jobs-buried: 0 cmd-put: 0 cmd-peek: 0 cmd-peek-ready: 0 cmd-peek-delayed: 0 cmd-peek-buried: 0 cmd-reserve: 0 cmd-reserve-with-timeout: 0 cmd-delete: 0 cmd-release: 0 cmd-use: 0 cmd-watch: 0 cmd-ignore: 0 cmd-bury: 0 cmd-kick: 0 cmd-touch: 0 cmd-stats: 1 cmd-stats-job: 0 cmd-stats-tube: 0 cmd-list-tubes: 0 cmd-list-tube-used: 0
everything is at default state. Now I run
producer.py code which will insert links in the queue.
stats now look like this:
stats OK 900 --- current-jobs-urgent: 0 current-jobs-ready: 3 current-jobs-reserved: 0 current-jobs-delayed: 0 current-jobs-buried: 0 cmd-put: 3 cmd-peek: 0 cmd-peek-ready: 0 cmd-peek-delayed: 0 cmd-peek-buried: 0 cmd-reserve: 0 cmd-reserve-with-timeout: 0 cmd-delete: 0 cmd-release: 0 cmd-use: 1 cmd-watch: 0 cmd-ignore: 0 cmd-bury: 0 cmd-kick: 0 cmd-touch: 0 cmd-stats: 2 cmd-stats-job: 0 cmd-stats-tube: 0 cmd-list-tubes: 0 cmd-list-tube-used: 0 cmd-list-tubes-watched: 0 cmd-pause-tube: 0 job-timeouts: 0 total-jobs: 3 max-job-size: 65535 current-tubes: 2 current-connections: 1
cmd-put is set to 3 since we added 3 links into the queue.
Now I am going to make another script called
consumer.py which is going to consume these links for parsing purpose and store info into another named queue/tube.
from pystalkd.Beanstalkd import Connection processed =  # dummy method to deal with scraping def parse(u): return 'Processed the link:- ' + u c = Connection("localhost", 11300) c.watch('unprocessed') # pulling links from tube for parsing while True: job = c.reserve(0) if job is not None: processed.append(parse(job.body)) job.delete() # Delete so it does not haunt us back if job is None: break c.use('parsed') # Storing scraped and parsed data into another tube for later db process if len(processed) > 0: for p in processed: c.put(p)
Again, after making connection it’s time to pull links from the queue for scraping purpose. In
consumer.py we used
.use and now
watch will be used since we are going to pull the links. I made a dummy
parse() method that is doing processing and returning data in
list . The job is then deleted so it does not come back and haunt us. The loop ends as soon as all jobs are dequeued.
Now it’s time to put parsed data, again,
use will be used but this time for parsed tube.
So simple. I am ending it now but what will happen in 3rd step is that you will pull the parsed data again and store into db.
This entire exercise was done to make system scaleable. Notice all intermediate work has been putt off the MySQL db and been taking care by Beanstalkd. The Db is only being used at first and last stage.
That’s it. This initial and beginner tutorial should help you to give
beanstalkd a try by making your scraper scaleable across multiple machines. It is very simple to use for small to medium applications.
Planning to write a book about Web Scraping in Python. Click here to give your feedback