Archive for

November 2011

A Light-Weight Work Distribution Queue with ZeroMQ

Working with the Twitter Streaming API, one of the challenges has been to efficiently even out the spikes in the tweet volume. We have a fixed number of resources crunching the tweets comming off the stream. A breaking news happens, tweet's spike and our backends go crazy. We consume tweets off the limited public firehose and the site stream endpoints of the twitter api. Public firehose comes off a single stream but the site stream API has a limitation of 100 users per connections. So we have multiple connections to the site stream API to listen to all the users streams. All the public statuses comming off these streams are combined and duplicates dropped. The remaining ones are processed into the system. We spawn off the processes handling the streams on one of the backend nodes and each tweet coming off that would spawn off a different process to do the processing. Normal loads this works fine since these process are short lived ones and they clear off leaving the system in a healthy state. When spikes occur processes spawned spike, messages in the stream handlers spike and sometimes nukes itself with no memory. So we then suppressed the process spawns if the process count was too high on the node. This meant lost tweets but a few missed tweets were ok in our scheme of things. Thats when the message queue of the streamers started going bust. Too many messages comming in from the stream, and we were not clearing the queue fast enough. Now process message queues can shoot you in the foot if they are not managed properly. If not cleared fast enough they will use up all your memory and bring you down real quick. Now we did not want to introduce a message queue external application in the mix and wanted a simple solution to the problem at hand. 

 

ZeroMQ looked ideal, sits as a library, nice NIF Erlang bindings erlzmq2, of special interest was the swap option which would start bufferring messages onto disk once the in-memory limits were exhausted. We played around with a few configurations and this how we are now setup. Each of the streaming processes connected to twitter pushes the message from the http stream into a PUSH socket. We coded up a ZMQ device consiting of a PULL and XREP/ROUTER socket combination. Each of the consumers was a REQ socket. The PULL-XREP socket allows us to balance the load to the different REQ sockets. Now each of the consumer process depending on the load on its node could decide the rate at which it will pull messages from the sink. The PUSH socket is SWAP enabled so it swaps excessive messages on to disk as needed. Overall it works like a charm. The ZMQ device is implemented as a simple gen_fsm, with the PULL socket being in passive mode and the XREP in active mode. A message from one of the REQ sockets triggers the cycle causing a blocking read on the PULL socket. The nice thing about the XREP socket is that it can route messages back to the original requestor. The moment it gets a message off the PULL socket sends it to the REQ socket and then waits for the next request. Overall works like a charm, for our current load which is about 10M tweets a day. 

Here is how the PULL-XREP device looks like

 

 

Filed under  //  erlang   inagist   streaming api   twitter   zeromq  
Posted by Jebu Ittiachen