Building a machine learning pipeline in Google’s Cloud

Khan Academy uses a few statistical models to personalize content around the site. One of the most important, the knowledge model, has been discussed on this blog before. The process of training these models: gathering data, training parameters, deploying parameters, and monitoring parameter performance, is tedious. I set about to automate as much of this process as possible. Discovering parameters requires a machine that 1) is able to run the scipy.optimize python module, and 2) has several GB of memory. These requirements and the benefits of close integration with Khan Academy’s existing Google Cloud systems (read: pretty much everything) led me to choose Google’s new managed VMs with some help from BigQuery to implement this system.

I have discussed the training process for the knowledge model before, but a quick refresher is as follows:

  1. ProblemLogs store a student’s correct or incorrect attempt on a single problem for a single exercise.
  2. ProblemLogs are processed in chronological order to re-create the KnowledgeState that existed for that student at the moment the question was answered.
  3. This creates a training set like KnowledgeState -> Correct/Incorrect that lets us predict whether students will get the next question correct based on their current KnowledgeState.
  4. We maintain a single KnowledgeState object for each student, but train parameters for each exercise, such that each exercise interprets the KnowledgeState in a different way.
  5. Right before prediction, the global KnowledgeState is augmented with local ExerciseState data to improve predictions, but this detail is omitted in the rest of this article.

Here’s a data flow diagram of the entire system:

A dataflow diagram of the automated knowledge model parameter training process.

Gathering the data

The ProblemLog table which contains the students’ history of attempts exists in BigQuery. Exporting from BigQuery to CSV files on Google Cloud Storage was pretty easy with the BigQuery API. However, when exporting >1GB of data, BigQuery shards the export into many 170MB files.

Munging the data

The problem with these 170MB files is that an individual student’s data is spread randomly among and within the shards, but I need to iterate over a single student’s data in chronological order. Ideally, an ORDER BY clause in BigQuery could solve this problem, but all of my attempts ended with a Resources Exceeded error. (Which is pretty common, based on the BigQuery section of Stack Overflow.)

So, I created a MapReduce job to map over each of the shards, sort them in-memory, and then write the sorted shard back out to Google Cloud Storage. This was a map-only job that used the _GoogleCloudStorageInputReader provided by the MapReduce library and streamed the sorted file back into GCS with the appengine GCS client library. Here’s a gist of this code.

Computing the features

The next step is to read each student’s data in chronological order. The previous step sorted the problem logs by student within each shard, but chronological ordering must be done across the shards. I created a MergeReader class that opens a handle to each of the shard files and collects a single student’s problem logs from the top of each shard. When all of a single student’s problem logs are in memory, they are sorted chronologically, and processed to emit training data.

As training data is emitted, a BatchWriter class writes that data to multiple shards, making sure that all data for an individual exercise is written to the same shard. With each shard wholly containing an exercise’s training data, the parameter training process can be parallelized.

Training the parameters

Now that the training data has been computed and is waiting in properly sharded .csv files on GCS, the parameter training process can begin. The training process is another MapReduce job that uses the _GoogleCloudStorageInputReader to iterate over each of the training data shards. All training data for an exercise is loaded into memory, formatted as a numpy array, and sent over to scipy.optimize to discover optimum theta values. Once an exercise’s parameters are trained, they are written into the production datastore so that we can start using them for predictions moving forward.

Monitoring parameter performance

After new parameters are deployed, we monitor their accuracy with a simple dashboard that plots average log likelihood and number of samples over time. This graph is generated with a BigQuery that aggregates ProblemLogs on a per-exercise/per-day basis and computes average log likelihood for each day.

A graph showing average log likelihood and number of samples over time for a specific instance of the knowledge model.