Hi,
I just moved my posts from Posterous! Do go though my blog for all the new posts.
Its easy to migrate try JustMigrate
3Crumbs app - Are you the local thrifter we all have been looking for?
Well, the title was supposed to mimic Whats in a name.
Working with the Twitter Streaming API, one of the challenges has been to efficiently even out the spikes in the tweet volume. We have a fixed number of resources crunching the tweets comming off the stream. A breaking news happens, tweet’s spike and our backends go crazy. We consume tweets off the limited public firehose and the site stream endpoints of the twitter api. Public firehose comes off a single stream but the site stream API has a limitation of 100 users per connections. So we have multiple connections to the site stream API to listen to all the users streams. All the public statuses comming off these streams are combined and duplicates dropped. The remaining ones are processed into the system. We spawn off the processes handling the streams on one of the backend nodes and each tweet coming off that would spawn off a different process to do the processing. Normal loads this works fine since these process are short lived ones and they clear off leaving the system in a healthy state. When spikes occur processes spawned spike, messages in the stream handlers spike and sometimes nukes itself with no memory. So we then suppressed the process spawns if the process count was too high on the node. This meant lost tweets but a few missed tweets were ok in our scheme of things. Thats when the message queue of the streamers started going bust. Too many messages comming in from the stream, and we were not clearing the queue fast enough. Now process message queues can shoot you in the foot if they are not managed properly. If not cleared fast enough they will use up all your memory and bring you down real quick. Now we did not want to introduce a message queue external application in the mix and wanted a simple solution to the problem at hand.
ZeroMQ looked ideal, sits as a library, nice NIF Erlang bindings erlzmq2, of special interest was the swap option which would start bufferring messages onto disk once the in-memory limits were exhausted. We played around with a few configurations and this how we are now setup. Each of the streaming processes connected to twitter pushes the message from the http stream into a PUSH socket. We coded up a ZMQ device consiting of a PULL and XREP/ROUTER socket combination. Each of the consumers was a REQ socket. The PULL-XREP socket allows us to balance the load to the different REQ sockets. Now each of the consumer process depending on the load on its node could decide the rate at which it will pull messages from the sink. The PUSH socket is SWAP enabled so it swaps excessive messages on to disk as needed. Overall it works like a charm. The ZMQ device is implemented as a simple gen_fsm, with the PULL socket being in passive mode and the XREP in active mode. A message from one of the REQ sockets triggers the cycle causing a blocking read on the PULL socket. The nice thing about the XREP socket is that it can route messages back to the original requestor. The moment it gets a message off the PULL socket sends it to the REQ socket and then waits for the next request. Overall works like a charm, for our current load which is about 10M tweets a day.
Here is how the PULL-XREP device looks like
https://gist.github.com/1414361
I often get these weird looks whenever I mention that inagist is written in Erlang. So here is some of the key areas where Erlang is a winner for us.
What we do
At inagist we try to summarize a real-time stream in real-time. Currently we work on top of the Twitter stream api. By summarizing I mean filter in real time the popular tweets in a stream and group tweets based on trends. See it in action at justinbieber.inagist.com, libya.inagist.com (try with chrome / safari for best results since it uses websockets). We do this summary on a stream which could be combined in any number of ways. ie; a users own stream (my stream), a keyword based search stream (libya.inagist.com), keyword + geo-location based tweet stream (sxsw.inagist.com).
Lightweight Processes
The key differentiator here is the real-time nature of how we summarize the tweet stream. Instead of possibly persisting each tweet in the stream and running off-line analytics we maintain limited caches for each user and keep popping in and out each tweet as it gains popularity or when key words are repeated in the stream for trend detection. Here is where a key aspect of Erlang fits in. Each of the stream consumer is modelled as an Erlang process, being light weight and isolated. It essentially is a proxy for each user provisioned. It receives tweets from the stream, manipulates the caches and responds to api queries for serving data. Each of these stream consumers are gen_server implementations, tied in a supervisor chain. In case one of the consumers goes down the supervisor brings it back up with no impact on the rest of the user base.
Messages
So how do we couple this to the stream of incoming data, with messages. Each tweet is delivered to the consumers as messages from the stream api client. Each tweet consumer is part of a process group tree spanning across machines. The moment a tweet is received from the n/w it is spawned off as a seperate process which json decodes the tweet and send messages to the distribution tree. The message trickles down to the consumer process which does its job of cache updations. Being asynchronous messages the client is not concerned about how many consumers there are to a tweet. If a consumer process is interested in a tweet it can consume it. Being a decentralized delivery it scales when load of incoming tweets goes up or if the number of consumers increase.
Distributed Erlang
As the number of consumers increase another key aspect of Erlang comes into play. Distribution across machines. Consumer processes by design are known only by a process id. Erlang works with local process ids the same way as it does with distributed processes. As long as a consumer is part of the distribution tree the tweet will be delivered to the process. This helps us in easily scaling out. Individual machines could fail independently without affecting the cluster as a whole only users provisioned on that machine are offline for the time the machine is offline.
Real Time Delivery
To be as real-time as possible we prefer to deliver over websockets to connected clients. Messages come into play here again. Each stream consumer generates messages as its caches figure out a trending tweet or trend. Web socket clients tap into this message stream, convert them to JSON and deliver to the client. Our chrome browser plugin is a websocket client which utilizes this delivery model. The extension notifies via desktop popups when ever a trend is detected or a tweet gains popularity above a certain level. We also bring in a different angle to real-time search with the extension. When a trend is detected the extension automatically starts searching for prominent tweets with those detected trends.
Streaming Search
I have written previously about how we use Riak for search and storage. In addition we have built some custom stuff which enables streaming search. Whenever we index documents in Riak we also send out the indexing data as messages to the search infrastructure. We also send this index terms as soon as a tweet is received on a seperate index. Here we have process waiting for <index, field, value> tuples to match against and notifies waiting processes of a document matching the search criteria. We currently support ==, and, or, >, <, >=, =< operators so we can detect any tweet containing sxsw (==), justin bieber (and), documents containing a text or lat long within a bounding box etc. Stream consumers use this to get real time filtered tweets from the stream. Also the chrome plugin taps into this search stream to notify a user whenever a tweet matches a detected trend or an explicit search query. This is really powerful since we now automatically figure out what a users interest topics are by way of trends and we can let the user know whenever there is something matching this interest topic in real time. This whole streaming search works with such low over head because of the nature of the the message based architecture that we can stream this all the way upto the browser typically under a second or two. You can see it in work when you click on the “Live Stream” heading on pages like these justinbieber.inagist.com.
I have just given a high level view of where Erlang acts as a differentiator but it should give some insight into why we do Erlang all the way. Drop in a comment or @jebui and will be happy to give more information if needed.
PS: I have not heard any of Justin Bieber’s or Lady Gaga’s albums they just happen to have a very active tweet stream.
I had in my previous post mentioned what we do with the search functionality at inagist.com. This post will look into the technical details behind the implementation. We use Riak as our storage layer, RiakSearch was a natural add on to it. I will try to detail my understanding of how riak search works and how we use it.
At its heart RiakSearch is an inverted index of terms to document id’s. The inverted index maintains an ordered set of document id’s, the merge_index backend which stores this index, splits this across various files. Specifically the backend has buffers which maintain the index in ETS as well as files, and segments which are files using a custom format to store ordered keys associated with an index, field name, field value tuple. Segments store metadata information regarding file offsets for key lookup and are loaded into ETS at startup for faster access. Additionally bloom filters speed up lookup in each offset. Buffers are periodicaly merged into segments, and segments once created are not updated except for the merge of segments into a single segment. Very much like the BitCask store for Riak. All this happens at the vnode level and riak core sits on top of all this and distributes the operations across vnodes. Index name, field name and field value are used to determine the hash for mapping to a vnode. At indexing time a document is split into postings which have index name, field name, field value mapping to document id and a bunch of properties. These are batched and send to vnodes responsible for each hash in parallel.
Queries are broken into a set of logical operations which combine each individual matching term and brings up a final list of matching documents which are sorted and ranked. A query like “tweet:facebook email” is broken into something like “tweet has facebook and email”. This translates to a logical and of docs having tweet:facebook and tweet:email, these operations are then send to the vnodes to stream the doc-id’s matching this operation. The doc id’s are then merged in order, via a merge sort since the keys are already sorted. This results in a final list of doc id’s matching the query and the properties for each doc. The results are sorted based on these properties and finally returned. The properties have a term frequency for each doc and a pre-plan operation give the document frequencies for each term allowing to sort the docs based on term frequency and inverse document frequency.
Now that was a whirl-wind simplified wrap up of my reading of the search code. To note here is that its a very performance aware implementation for indexing and simple queries. Queries with low cardinality terms could throw away your search times, there is something in the works for this specific issue with inline fields. Also queries which could return millions of rows are also possible memory busters, even if you give options to limit the number of results these are operations which happen after the full results of the operation are in memory. Both of these re-iterates the fact that this is as the name implies “RiakSearch”, an addon to make your life easier when working with Riak. The implementation is tailored for mating with the map-reduce operations for riak and that is readily exposed via all the interfaces to riak search.
How do we use it?
The search box on inagist.com is directly wired into riaksearch. To prevent our queries from leading to memory exhaustion we do a couple of tricks. As previously mentioned our backend is fully in Erlang and we directly talk to Riak in Erlang. We directly call the search_fold on the riak_search_client from our code and break the search operation when we have enough results. Our keys, the tweet id’s are stored as negative numbers so that the sort ordering of the keys means we get the first n docs ordered in a latest first manner. We then rank them in this limited set.
The next place we use search is for threading conversations on the tweet detail page. I had in an earlier blog post mentioned how we did that with links and and link-map-reduce operations. With search we just index the replies against the tweet it is in reply to and bring it back in from search on the reply to field. This is better since link updates modifies the whole document un-necessarily, where we only needed the meta-data on the tweet to be updated.
Another place we plugin search is to run our clean-up operation. We index the tweet timestamps to the minute granularity and cleanup tweets older than a certain time period. Getting to older tweets without search would have meant we maintain the ids separately to get a handle on which id’s to flush out.
While you can choose to have riaksearch index all documents that are stored in your riak cluster via a pre-commit hook, we decided to trigger the indexing via our own calls into riak search. Two reasons to this, we found the pre-commit hook fail a couple of time with a timeout under heavy load, also our indexing needs meant we index the text in a tweet at a point when the tweet was determined to be indexable by the app and not at the point of insert into the back end store. Params like the time stamp however are indexable at insert time.
Final Thoughts
RiakSearch perfectly complements Riak key value store. It frees you from having to access documents by id alone and managing your data is simpler. The fact that it works well with existing java code for text analysis is also worth mentioning. Its still in beta so I guess things are only going to get better from here.
We recently pushed out a search box on inagist.com, which looks pretty normal but does a whole different take on twitter search. A normal twitter search shows a couple of prominent tweets and the whole real time result stream. We give it two different views. One a search within your follow circle, and another the broader universe, additionally not all tweets are indexed but only the prominent tweets. So a search for “facebook email” after I login to inagist shows me two sections, sorted by activity, allowing me to quickly catch up on something, as perceived by people I’m interested in and the general public.
We take this a step further to apply to trends that we detect in your stream or the channels you follow. As I’m writing this post I get an alert from the chrome plugin that “Prince William” is trending in World News. I click through on the trend and get a gist of the tweets which caused it to trend in World News and also a bunch of tweets from my follow list on “Prince William”.
Hope this enhances your experience on inagist.com. We also use this search to power our Related Tweets widget, which you can see in action on the LiveMint blog.
Behind all of this is riak search from the wonderful guys at Basho. I will get more into the technicals of the search and experiences of using riak search in the next post. In the meanwhile do search around and let us know how you are liking / disliking it.
At inagist.com we want to make sure you get your news as it happens. The traditional web interface has its limitations in realizing this experience. While we would like you to stay glued to our site seeing new tweets and trends or keep watching the stream flow by, we guess you might need to step out once in a while :) So we now integrate with notify.io and push events as they happen to you. You decide how you want to receive it, on your iphone, desktop, SMS or IM.
To get started with push login via Twitter to inagist.com, go to your settings page and add your email id or md5 hash to the Jabber/Gmail/Notify.io text box down below. Whenever something of interest comes by next we will send it your way. Make sure you allow events from inagist in your notify.io settings page.
We push tweets which have crossed a certain threshold in terms of activity, trends picked up in your stream and trends picked up in your favorite channels. All of these are tagged nicely to optionally filter them nicely on your client. Tweets are tagged “tweet”, personal trends as “trend”, and channel trends as “channel_trend” with an additional tag of the channel name. So trends from worldnews.inagist.com is tagged with “channel_trend worldnewsgist”.
In the rare chance of notify.io being down we do have our own xmpp bot you can befriend at inagist@talkr.im from the jabber id you have configured (md5 hash will not do) above and you will get xmpp messages from the inagist bot directly.
Try it out and let us know, on how we can improve things.
My last post felt a little incomplete without some code backing it up. I’m following it up with a code sample of how exactly this map reduce is wired up.
I will walk through how we do the “Popular Replies” section on the conversation page. Again here is a @BarackObama tweet, with more than a 500 replies. Popular replies extracts only those replies which have been further replied to, re-tweeted or a reply from the author of the tweet itself. Right now its picked out 1 of these 500+ replies.
Data Model
Resonses to a tweet are captured in a bucket of its own «”tweet_responses_bucket”». Each tweet is keyed by its tweet id as a 128 bit binary «TweetId:128». Response details are not stored directly on this resource but a linked value in a bucket called “tweet_responses_subkeys_bucket”. Responses are stored as links on a resource keyed as «TweetId:128, (ResponseId rem 10):8» in this bucket. This resource is added as a link on the {«”tweet_responses_bucket”», «TweetId:128»} resource and tagged as «”tweet_response”». A reply is recorded as a link of the form {{«ResponseId:128», «ResponseAuthorId:128»}, «”reply”»}. A link is represented as {{Bucket, Key}, Tag}, this link does not point to a valid bucket, key pair but is purely for our own interpretation.
Here is how it would look
<<"tweet_responses_bucket">> ---------------------------- |----------------------------------------| | <<20337776197:128>> | |----------------------------------------| | Links | | | | {{<<"tweet_responses_subkeys_bucket">>,| | <<20337776197:128,0:8>>}, | | <<"tweet_response">>}, | | {{<<"tweet_responses_subkeys_bucket">>,| | <<20337776197:128,1:8>>}, | | <<"tweet_response">>}, | | .... | |----------------------------------------| | Value | | | |----------------------------------------| <<"tweet_responses_subkeys_bucket">> ------------------------------------ |----------------------------------------| | <<20337776197:128,0:8>> | |----------------------------------------| | Links | | | |{{<<20339861590:128>>,<<18035803:128>>},| | <<"reply">>}, | | .... | |----------------------------------------| | Value | | | |----------------------------------------| |----------------------------------------| | <<20337776197:128,1:8>> | |----------------------------------------| | Links | | | |{{<<20337857101:128>>,<<82294968:128>>},| | <<"reply">>}, | | .... | |----------------------------------------| | Value | | | |----------------------------------------|
Code
And now here is the piece of code this does the extraction of the popular replies. The function gives a sorted list of {TweetId, AuthorId} tuples which are then looked up and served.
https://gist.github.com/510070
Hopefully the code is self explanatory. Of interest is the make_local_fun which creates a function reference which can be passed over to a remote node, without the remote node having a copy of this compiled code in its path.
Feel free to comment on anything I have overlooked or could be done better :)
Its been often asked of us, “How do you decide scores for the tweets that you show on inagist.com?” Well here is how we determine what to show and what not to show.
We divide tweets coming in from the Twitter stream into 2 categories, ones which have a URL and ones which do not. The assumption being that tweets with URL tend to be teasers to the content at the linked URL. Tweets from different users mentioning the same URL (possibly through a URL shortner) are scored against the URL.
Another basic assumption is how we classify your follow community. The follow community is composed of the people you follow directly and people whom you follow indirectly through lists. If you follow a person directly we give a higher score to activity from that user opposed to activity from a user in one of the lists that you follow. The reasoning here being that people you directly follow is what you see on most Twitter clients by default and possibly people whom you care about more.
Once your twitter id is enabled on inagist.com, we fetch your follow community and start watching the twitter stream on your behalf for activity from, or about, the people in this community. A retweet or a reply to a tweet makes an activity on a tweet. A retweet of a person you directly follow by a person in one of your lists, gains a higher score than a retweet of that person by someone not in your community. And a retweet of that person by a person you directly follow gains an even higher score. Similar scoring applies to replies except that reply from a person not in your community to a person in your community does not count. Conversations between people you directly follow gain higher scores than conversations across people in your lists.
We present tweets which have a high score sorted by time on your user page. And yes options are coming soon to sort by score and filter by the score.
We do all of this in real-time for each enabled user on inagist. To be reasonable on the costing behind all of this we use bounded caches for scoring tweets. So if you have more people in your follow community you will see tweets expiring faster from your main page. We do archive these tweets which we discover, so you can go back and see those tweets which we had picked out in the past. Clicking on a list name pulls in tweets from the archive for that particular list.
Conversations page gives you more context on the activity surrounding these filtered tweets. Clicking on the activity icon
takes you to the conversation page where all the replies to a tweet are shown in order. We also present two different views on these replies. Popular replies are ones which have been further replied to or retweeted. Replies from your friends brings out only replies from your community.
We encourage you to sign in and see this in action for yourself. Also get into Twitter and start following lists of your choice on topics that interest you, or create lists of your own and curate your sources. May we recommend verified, tlists and scobleizer for a variety of lists to get you started. Continue using twitter with your favorite client and inagist.com to surface interesting tweets from all your sources, we assure you, you will not miss a single one of those interesting 140 characters.