Whats in a tweet?
Well, the title was supposed to mimic Whats in a name.
Well, the title was supposed to mimic Whats in a name.
Working with the Twitter Streaming API, one of the challenges has been to efficiently even out the spikes in the tweet volume. We have a fixed number of resources crunching the tweets comming off the stream. A breaking news happens, tweet's spike and our backends go crazy. We consume tweets off the limited public firehose and the site stream endpoints of the twitter api. Public firehose comes off a single stream but the site stream API has a limitation of 100 users per connections. So we have multiple connections to the site stream API to listen to all the users streams. All the public statuses comming off these streams are combined and duplicates dropped. The remaining ones are processed into the system. We spawn off the processes handling the streams on one of the backend nodes and each tweet coming off that would spawn off a different process to do the processing. Normal loads this works fine since these process are short lived ones and they clear off leaving the system in a healthy state. When spikes occur processes spawned spike, messages in the stream handlers spike and sometimes nukes itself with no memory. So we then suppressed the process spawns if the process count was too high on the node. This meant lost tweets but a few missed tweets were ok in our scheme of things. Thats when the message queue of the streamers started going bust. Too many messages comming in from the stream, and we were not clearing the queue fast enough. Now process message queues can shoot you in the foot if they are not managed properly. If not cleared fast enough they will use up all your memory and bring you down real quick. Now we did not want to introduce a message queue external application in the mix and wanted a simple solution to the problem at hand.
ZeroMQ looked ideal, sits as a library, nice NIF Erlang bindings erlzmq2, of special interest was the swap option which would start bufferring messages onto disk once the in-memory limits were exhausted. We played around with a few configurations and this how we are now setup. Each of the streaming processes connected to twitter pushes the message from the http stream into a PUSH socket. We coded up a ZMQ device consiting of a PULL and XREP/ROUTER socket combination. Each of the consumers was a REQ socket. The PULL-XREP socket allows us to balance the load to the different REQ sockets. Now each of the consumer process depending on the load on its node could decide the rate at which it will pull messages from the sink. The PUSH socket is SWAP enabled so it swaps excessive messages on to disk as needed. Overall it works like a charm. The ZMQ device is implemented as a simple gen_fsm, with the PULL socket being in passive mode and the XREP in active mode. A message from one of the REQ sockets triggers the cycle causing a blocking read on the PULL socket. The nice thing about the XREP socket is that it can route messages back to the original requestor. The moment it gets a message off the PULL socket sends it to the REQ socket and then waits for the next request. Overall works like a charm, for our current load which is about 10M tweets a day.
Here is how the PULL-XREP device looks like
I had in my previous post mentioned what we do with the search functionality at inagist.com. This post will look into the technical details behind the implementation. We use Riak as our storage layer, RiakSearch was a natural add on to it. I will try to detail my understanding of how riak search works and how we use it.
At its heart RiakSearch is an inverted index of terms to document id's. The inverted index maintains an ordered set of document id's, the merge_index backend which stores this index, splits this across various files. Specifically the backend has buffers which maintain the index in ETS as well as files, and segments which are files using a custom format to store ordered keys associated with an index, field name, field value tuple. Segments store metadata information regarding file offsets for key lookup and are loaded into ETS at startup for faster access. Additionally bloom filters speed up lookup in each offset. Buffers are periodicaly merged into segments, and segments once created are not updated except for the merge of segments into a single segment. Very much like the BitCask store for Riak. All this happens at the vnode level and riak core sits on top of all this and distributes the operations across vnodes. Index name, field name and field value are used to determine the hash for mapping to a vnode. At indexing time a document is split into postings which have index name, field name, field value mapping to document id and a bunch of properties. These are batched and send to vnodes responsible for each hash in parallel.
Queries are broken into a set of logical operations which combine each individual matching term and brings up a final list of matching documents which are sorted and ranked. A query like "tweet:facebook email" is broken into something like "tweet has facebook and email". This translates to a logical and of docs having tweet:facebook and tweet:email, these operations are then send to the vnodes to stream the doc-id's matching this operation. The doc id's are then merged in order, via a merge sort since the keys are already sorted. This results in a final list of doc id's matching the query and the properties for each doc. The results are sorted based on these properties and finally returned. The properties have a term frequency for each doc and a pre-plan operation give the document frequencies for each term allowing to sort the docs based on term frequency and inverse document frequency.
Now that was a whirl-wind simplified wrap up of my reading of the search code. To note here is that its a very performance aware implementation for indexing and simple queries. Queries with low cardinality terms could throw away your search times, there is something in the works for this specific issue with inline fields. Also queries which could return millions of rows are also possible memory busters, even if you give options to limit the number of results these are operations which happen after the full results of the operation are in memory. Both of these re-iterates the fact that this is as the name implies "RiakSearch", an addon to make your life easier when working with Riak. The implementation is tailored for mating with the map-reduce operations for riak and that is readily exposed via all the interfaces to riak search.
How do we use it?
The search box on inagist.com is directly wired into riaksearch. To prevent our queries from leading to memory exhaustion we do a couple of tricks. As previously mentioned our backend is fully in Erlang and we directly talk to Riak in Erlang. We directly call the search_fold on the riak_search_client from our code and break the search operation when we have enough results. Our keys, the tweet id's are stored as negative numbers so that the sort ordering of the keys means we get the first n docs ordered in a latest first manner. We then rank them in this limited set.
The next place we use search is for threading conversations on the tweet detail page. I had in an earlier blog post mentioned how we did that with links and and link-map-reduce operations. With search we just index the replies against the tweet it is in reply to and bring it back in from search on the reply to field. This is better since link updates modifies the whole document un-necessarily, where we only needed the meta-data on the tweet to be updated.
Another place we plugin search is to run our clean-up operation. We index the tweet timestamps to the minute granularity and cleanup tweets older than a certain time period. Getting to older tweets without search would have meant we maintain the ids separately to get a handle on which id's to flush out.
While you can choose to have riaksearch index all documents that are stored in your riak cluster via a pre-commit hook, we decided to trigger the indexing via our own calls into riak search. Two reasons to this, we found the pre-commit hook fail a couple of time with a timeout under heavy load, also our indexing needs meant we index the text in a tweet at a point when the tweet was determined to be indexable by the app and not at the point of insert into the back end store. Params like the time stamp however are indexable at insert time.
Final Thoughts
RiakSearch perfectly complements Riak key value store. It frees you from having to access documents by id alone and managing your data is simpler. The fact that it works well with existing java code for text analysis is also worth mentioning. Its still in beta so I guess things are only going to get better from here.
How often has twitter been your source of breaking news? Think Mangalore plane crash, gulf oil spill updates. How many times in the recent past did you learn about those tid-bits from your twitter stream? Think Google Pacman doodle, iPhone 4G leaks. Now think how often have you felt that an important tweet has gone un noticed because of the volume of tweets that keeps coming into you, or of how often you have refrained from following people because you are not able to keep up with your existing tweets.
If you are not so much into twitter but often heard about it, have you wondered if you could get information that mattered to you out of twitter? Could twitter be your portal to the web?
We present inagist.com, a real-time tweet filtering system, which sifts through those thousands of tweets which are flowing in to your account and show you just those ones which are interesting, or thought to be of value by other people. We rank tweets based on the perceived relevance of the content of a tweet. We then show you those tweets which are relevant, sorted either by time or relevance. You can do the normal activities then on a tweet of replying, retweeting, favoriting etc. Additionally we pull in url summaries, media previews in place to give a complete reading experience. Tweets are pulled in from people and lists you follow, so go ahead either follow that person or add him/her to a list of yours, or follow those lists that @scobleizer has painstakingly put together. We advice you to follow directly those people that matter to you so your twitter page looks clean and interesting, your inagist page will sort and rank tweets from all those numerous others. Once you authorize your Twitter account with InAGist we regularly sync your account information to keep abreast of your follow changes. Go ahead try it out at http://inagist.com/<your twitter handle>, we are at jebui, chjeeves, netroy.
For those twitter unaware we present inagist channel pages. These are curated twitter accounts on specific topics, world news is at worldnews.inagist.com, India news at india.inagist.com, geeks at geek.inagist.com. We have created twitter accounts which follow sources on these topics so you can come in and get your taste of tweets. Tweets from these acounts are ranked as explained before so you get to see only those tweets from these sources which are relevant. All those channels are listed at twitter.com/InAGist/channels/members.
Now we are still in early stages of development and not opened to the whole twitter crowd. Do check if your account is already enabled at http://inagist.com/<your twitter handle>, if not please authorize inagist app and we will enable as we go along.
Your feedback is important to us, feel free to get back with comments, feature requests, bouqets, brick bats - @inagist is the preferred channel. InAGist is a product offering from Iyotta Software.