Image Credit (http://blog.hqc.sk.ca/wp-content/uploads/2012/12/Queue-2012-12-11.jpg)
Queues are often used to make applications scaleable by offloading the data and process them later.
In this post I am going to use BeansTalk queue management system in Python. Before I get into real task, allow me to give a brief intro of Beanstalk.
What is Beanstalk?
From the official website:
Beanstalk is a simple, fast work queue.
Its interface is generic, but was originally designed for reducing the latency of page views in high-volume web applications by running time-consuming tasks asynchronously.
It’s demon called beanstalkd
which you can run on *nix based machines. Since I am on OSX so I called brew install beanstalkd
to install it.
Once installed you can run it’s demon by running the following command:
./beanstalkd -l 1.2.3.4 -p 11300
where 1.2.3.4
is the IP of the machine queue is running and 1130
is the port number.
You can also run it by doing:
./beanstalkd
only and it will run on localhost
. You can also enable verbose mode by enabling -V
switch.
Queue implementation
Our goal is to use scrapers holding list of URLs to be scraped and respective parsed data. Ideally data is stored in a database but since the number of URLs are getting increased so it’s putting lots of burden on our MySQL server due to extensive I/O operations thus making the entire system slow and less efficient. Since data is being saved in realtime thus making MySQL response time slow.
So how to cope up with it? Well, what I am going to do is to make two named queues, also called tubes. We need a couple of cubes. Let’s name them unprocessed and parsed respectively. So what will happen that links which we assume are stored in a Db table will be put in unprocessed job, the consumer app, the script that will pull the link and put parsed data in parsed tube.
The first script I am going to make, I am going to call it producer.py
. This script will input URLs in named queues aka tubes.
from pystalkd.Beanstalkd import Connection links1 = [] links1.append('http://1.com') links1.append('http://2.com') links1.append('http://3.com') c = Connection("localhost", 11300) print('Putting jobs in links') c.use('unprocessed') # Unprocessed links for l in links1: c.put(str(l))
After importing pystalkd
library I appended arbitrary links in the list and then created connection to the beanstalk queue server running on 11300
port. After that I called use
to create a named queue or tube and put the links in it.
I am doing another thing, opening a TELNET
connection to the queue server to run different commands, in our case the stats
command. Open another terminal window and run the command:
telnet localhost 11300
and then execute the command stats
in it. Initially if you run the stats
then you see something like this:
$ telnet localhost 11300 Trying ::1... telnet: connect to address ::1: Connection refused Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. stats OK 898 --- current-jobs-urgent: 0 current-jobs-ready: 0 current-jobs-reserved: 0 current-jobs-delayed: 0 current-jobs-buried: 0 cmd-put: 0 cmd-peek: 0 cmd-peek-ready: 0 cmd-peek-delayed: 0 cmd-peek-buried: 0 cmd-reserve: 0 cmd-reserve-with-timeout: 0 cmd-delete: 0 cmd-release: 0 cmd-use: 0 cmd-watch: 0 cmd-ignore: 0 cmd-bury: 0 cmd-kick: 0 cmd-touch: 0 cmd-stats: 1 cmd-stats-job: 0 cmd-stats-tube: 0 cmd-list-tubes: 0 cmd-list-tube-used: 0
everything is at default state. Now I run producer.py
code which will insert links in the queue. stats
now look like this:
stats OK 900 --- current-jobs-urgent: 0 current-jobs-ready: 3 current-jobs-reserved: 0 current-jobs-delayed: 0 current-jobs-buried: 0 cmd-put: 3 cmd-peek: 0 cmd-peek-ready: 0 cmd-peek-delayed: 0 cmd-peek-buried: 0 cmd-reserve: 0 cmd-reserve-with-timeout: 0 cmd-delete: 0 cmd-release: 0 cmd-use: 1 cmd-watch: 0 cmd-ignore: 0 cmd-bury: 0 cmd-kick: 0 cmd-touch: 0 cmd-stats: 2 cmd-stats-job: 0 cmd-stats-tube: 0 cmd-list-tubes: 0 cmd-list-tube-used: 0 cmd-list-tubes-watched: 0 cmd-pause-tube: 0 job-timeouts: 0 total-jobs: 3 max-job-size: 65535 current-tubes: 2 current-connections: 1
current-jobs-ready
and cmd-put
is set to 3 since we added 3 links into the queue.
Now I am going to make another script called consumer.py
which is going to consume these links for parsing purpose and store info into another named queue/tube.
from pystalkd.Beanstalkd import Connection processed = [] # dummy method to deal with scraping def parse(u): return 'Processed the link:- ' + u c = Connection("localhost", 11300) c.watch('unprocessed') # pulling links from tube for parsing while True: job = c.reserve(0) if job is not None: processed.append(parse(job.body)) job.delete() # Delete so it does not haunt us back if job is None: break c.use('parsed') # Storing scraped and parsed data into another tube for later db process if len(processed) > 0: for p in processed: c.put(p)
Again, after making connection it’s time to pull links from the queue for scraping purpose. In consumer.py
we used .use
and now watch
will be used since we are going to pull the links. I made a dummy parse()
method that is doing processing and returning data in list
. The job is then deleted so it does not come back and haunt us. The loop ends as soon as all jobs are dequeued.
Now it’s time to put parsed data, again, use
will be used but this time for parsed tube.
So simple. I am ending it now but what will happen in 3rd step is that you will pull the parsed data again and store into db.
This entire exercise was done to make system scaleable. Notice all intermediate work has been putt off the MySQL db and been taking care by Beanstalkd. The Db is only being used at first and last stage.
That’s it. This initial and beginner tutorial should help you to give beanstalkd
a try by making your scraper scaleable across multiple machines. It is very simple to use for small to medium applications.
Planning to write a book about Web Scraping in Python. Click here to give your feedback