Big Data at Khan Academy

At Khan Academy, we currently have three frameworks for extracting meaningful information from the data we collect as students and teachers work their way through our content.  Every day, we collect around 8 million data points on exercise and video interactions, and a few million more around community discussion, computer science programs, and registrations.  Not to mention the raw web request logs, and some client-side events we send to MixPanel.

The most mature system runs Hive on Amazon EMR.  This year, we have created a few jobs that run on top of the Google Mapreduce and Pipeline frameworks directly within Appengine.  And, very recently, we have started exporting backups to Google Cloud Storage, which Google BigQuery can consume and run queries over at an insane speed.

We use Hive and GCS for data warehousing and giant queries that do not need to output pristine, realtime information.  The Hive system has worked well since it was created, but now that the Google technologies are catching up, I am hopeful that continuing to move in that direction will make our lives much easier.

The Mapreduce library allows us to perform queries and data maintenance over production data, on production servers.  The Pipeline library is a low-level distributed computing framework which is used for advanced operations that do not fit the Mapreduce semantics, which usually means chaining several Mapreduce jobs together.

Hive and Amazon Elastic MapReduce

Moving our data out of the production datastore and into Hive gives us a safe place to iterate on queries using the powerful HiveQL features like UNION, sub-queries, and, most importantly, JOIN.  With these, we can answer questions like “Based on their students’ activity, which coaches have been the most active in the past year?”.  We also use this system to train our machine learning models to understand students’ knowledge state and recommend content most likely to help them improve.

This setup requires a few transformations to get the data into a format that Hive can understand.  Here is an outline of what happens.

  1. An hourly cron job running on an Amazon EC2 machine calls an API on khanacademy.org to download any datastore entities that have changed in the last hour.  Entities are transferred in protobuf format, converted to JSON, written to a local file, and compressed.  Each file contains JSON objects for all of the entities changed during that hour for a particular Model.  Each line has the key for the entity, and the JSON string which represents it:
    • "entity key"\t"a giant JSON string"
  2. A daily cron job running on the same machine launches several Hadoop jobs to concatenate all of the hourly data files into a single daily partition, which we write to Amazon S3.  This results in a single partition per day, per Model.  Each line of these files is in the same format mentioned above.
    • s3://ka/entity_store/ProblemLog/dt=2013-09-12/data.gz
  3. With the data in this format, we can mount rudimentary tables in Hive with:
    CREATE EXTERNAL TABLE IF NOT EXISTS ProblemLog (
        user string, json string
    )
    PARTITIONED BY (dt string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    LOCATION 's3://ka/entity_store/ProblemLog';
    ALTER TABLE ProblemLog RECOVER PARTITIONS;
  4. We then use Hive to transform this data into a more tractable format.
    INSERT OVERWRITE TABLE user_exercise_summary PARTITION (dt='${dt}')
    SELECT
      parsed.user, parsed.exercise, SUM(parsed.time_taken), 
      SUM(parsed.correct), SUM(parsed.wrong),
      MAX(IF(parsed.proficient, 1, 0)) = 1
    FROM (
      SELECT
        get_json_object(ProblemLog.json, '$.user') AS user,
        get_json_object(ProblemLog.json, '$.exercise') AS exercise,
        get_json_object(ProblemLog.json, '$.time_taken') AS time_taken,
        IF(get_json_object(ProblemLog.json, '$.correct') = "true", 1, 0) AS correct,
        IF(get_json_object(ProblemLog.json, '$.correct') != "true", 1, 0) AS wrong, 
        get_json_object(ProblemLog.json, '$.earned_proficiency') = "true" 
          AS proficient
      FROM ProblemLog
      WHERE ProblemLog.dt = '${dt}'
    ) parsed
    GROUP BY parsed.user, parsed.exercise;
  5. Finally, we shuttle any results we want to surface on a dashboard into a MongoDB instance, which can be easily accessed by JavaScript via sleepy-mongoose.

Now the data exists in a few places which provide varying levels of accessibility.  You can fire up a Hive instance to do some heavy duty queries, or issue a REST call to download a compact summary.

Google Mapreduce on Appengine

Our most common use of the Mapreduce library is to perform schema changes and data clean-up across many entities in the datastore.  For example, removing a now-unused property from a Model, awarding time-based achievement badges, or recalculating exercise progress based on the eventually-consistent problem logs.

Another common use is calculating aggregate data to determine things like the average time spent completing an exercise.

The Mapreduce library is powerful, but can be difficult to get working exactly as you want.  I will talk in depth about how we use this library in a later post.

Google BigQuery on Cloud Storage

The Google Cloud Storage and BigQuery pipeline is conceptually very similar to the EMR and Hive pipeline, but much, much easier to setup.  It provides many powerful statements, but lacks the ability to add custom map or reduce functions, which we have done in Hive.  We have just started playing with these technologies, but speed of setup and retrieving query results has been very impressive.  I hope to write more on this topic after we have used it more extensively.

Here are the steps I went through to setup and execute a simple query over one Model:

  1. From the Datastore Admin page, select the entities you want to backup, and click Backup Entities.  Specify Google Cloud Storage and the bucket name, and then start the job.  This will create a hierarchy of files, the bottom of which actually contains the data:
    • <key>.backup_info – The job-level manifest file.
    • <key>.YourModel.backup_info – The Model-level manifest file.
    • <name>_<date>_<key2>-output-<shardNumber>-retry-<retryCount>
  2. From the Google BigQuery console, click through to Create a New Table, and then select Google Cloud Storage on the Select Data stage.  Specify the <key>.YourModel.backup_info file created in the previous step as the source of data.
  3. After the job is finished, you can query the table with BigQuery!

This is easy to do by clicking around in the UI, but automating the entire process is a bit more difficult, which is what we are currently working on.

8 thoughts on “Big Data at Khan Academy

  1. As someone who writes hive queries at FB all day long, the multiple calls to GET_JSON_OBJECT() make me sadpanda.gif.

    You can replace these with a single call to JSON_TUPLE(). At scale, at least our scale, the savings add up.

    Try

    SELECT….
    FROM ProblemLog
    LATERAL VIEW JSON_TUPLE(ProblemLog.json, ‘user’, ‘exercise’, ‘time_taken’, ‘correct’, ‘earned_proficiency’) pl AS problem_log_user, problem_log_exercise, problem_log_time_taken, problem_log_correct, problem_log_earned_proficiency

    and then you can refer(and re-use) the extracted JSON values.

    LATERAL VIEW docs https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView#LanguageManualLateralView-LateralViewSyntax

    JSON_TUPLE() docs
    https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF

    • Thanks for the callout, Neil. We do use the LATERAL VIEW/JSON_TUPLE approach in many other queries, and I agree that it is a better approach. We just haven’t gotten around to optimizing this example, yet.

      Although, I have seen some weird exceptions with this approach, as I mentioned in this previous blog post. I couldn’t find a copy of the query that caused the exception mentioned, but let me know if the stacktrace looks familiar and you have any clues.

      http://mattfaus.com/2013/03/manually-parsing-scientific-notation-in-hive-0-81/

  2. Thanks for sharing. I did have a few questions though: it sounds like you run multiple EMR jobs throughout the day. Are all the jobs defined beforehand, or are some jobs issued in an ad-hoc manner? If some jobs are issued in an ad-hoc manner, do you employ any sort of job management tool? Given that you do run many queries, are there are EMR specific issues that you’ve encountered (long spin up time, cost, etc)? Finally, do you have any issues with the fact that EMR is ephemeral in nature (without any option for using EBS)? Again, thanks for the fun post!

    • We do run some queries in an ad-hoc manner, and we just use Hadoop’s JobTracker to understand what’s going on. Although, we generally don’t have too many concurrent ad-hoc jobs, since our analytics team is only a few people.

      The long spin-up time is the most annoying aspect of developing new queries, and one of the major things I like about BigQuery. Once the data is loaded into BigQuery, you can run very complex queries incredibly quickly. Cost hasn’t been an issue since the machine time we use to service these queries pails in comparison to that needed to serve the website traffic (although, I don’t review billing details).

      I’m not sure I understand your last question. In step 4 of my example, if you assume user_exercise_summary is defined similarly as ProblemLog (i.e. with LOCATION ‘s3://ka/entity_store/ProblemLog’), then this query will write data into S3, available for anyone else to use as well. I also occasionally write data to the local disk, transform it to csv, and then scp it to my local computer so I can send it as a spreadsheet to others on the team.

      • Thanks for the great response. For my last question, I’ve observed a few times that people will create a “persistent” EMR (usually so they can run a few ad-hoc queries), and then forget to back up key data to S3. Sounds like this isn’t an issue with you guys though.

  3. Great article!
    Since you are already using some services from Amazon, I wonder if you have tried Amazon RedShift? If so, I am curious to know the reasons why you have chosen BigQuery over RedShift. Thanks!

    • We have not tried RedShift (yet). Since all of our production data lives in Google’s Datastore, we’re hoping that moving it from there to Google BigQuery will be the easiest/fastest option available.

      • Ok. I was wondering that since some of your data were already in Amazon S3 for your EMR jobs. Once your data is in Amazon S3 it is really easy and fast to upload files from S3 to Redshift (it just a single command to run).
        It makes sense for you to use BigQuery if your data is already in Google Cloud Storage, however with BigQuery costs can rise up quickly if you start having a lot of data and you have a lot of queries to do. It all depends what are your needs.

Leave a Reply to James Horey Cancel reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>