10.20.2014

Scala: How to Limit DynamoDB's Range Query

Scala: AWS DynamoDB のテーブルに対して件数制限付きのレンジクエリを実行する

 

DynamoDB の range クエリについて、理解が足りなかったので整理しておく。

今回は例として、食事の記録を以下の項目とともに DynamoDB に格納し、
ユーザごとの最新 n 件の食事を調べるクエリを投げるようなアプリケーションを考えてみる。

DynamoDB: FoodLog
  • UserId [ハッシュキー]: ユーザID (文字列)
  • Timestamp [レンジキー]: タイムスタンプ(epoch からの経過時間をミリ秒単位で格納) (整数値)
  • Food: 食事した内容 (文字列)
  • Calorie: 摂取カロリー (整数値)

 

テーブル作成

aws-cli で以下のコマンドを実行し、FoodLog テーブルを作成する。
(aws-cli および認証情報はセットアップ済みの前提)

$ aws dynamodb create-table \
--table-name FoodLog \
--attribute-definitions \
AttributeName=UserId,AttributeType=S \
AttributeName=Timestamp,AttributeType=N \
--key-schema AttributeName=UserId,KeyType=HASH AttributeName=Timestamp,KeyType=RANGE \
--provisioned-throughput ReadCapacityUnits=1,WriteCapacityUnits=1

 

ベース部分実装

Java のライブラリと O/R マッパーを使って FoodLog クラスを実装。
接続情報は環境変数(AWS_ACCESS_KEY, AWS_SECRET_KEY)より与えられる想定である。

package com.github.mogproject.example.dynamodb

import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.datamodeling._

import scala.annotation.meta.beanGetter
import scala.beans.BeanProperty
import scala.collection.JavaConverters._

trait DynamoDBClient {
  private[this] val accessKeyId = sys.env("AWS_ACCESS_KEY")
  private[this] val secretAccessKey = sys.env("AWS_SECRET_KEY")
  private[this] val region = RegionUtils.getRegion("ap-northeast-1")
  private[this] val endpoint = region.getServiceEndpoint("dynamodb")

  private[this] val credentials = new BasicAWSCredentials(accessKeyId, secretAccessKey)
  private[this] val client = {
    val ret = new AmazonDynamoDBClient(credentials)
    ret.setRegion(region)
    ret.setEndpoint(endpoint)
    ret
  }
  protected val mapper = new DynamoDBMapper(client)

  def batchSave(xs: FoodLog*) = mapper.batchSave(xs.asJava)

  def batchDelete(xs: FoodLog*) = mapper.batchDelete(xs.asJava)

  def batchWrite(toWrite: Seq[FoodLog], toDelete: Seq[FoodLog]) = mapper.batchWrite(toWrite.asJava, toDelete.asJava)
}

@DynamoDBTable(tableName = "FoodLog")
case class FoodLog(
                    @(DynamoDBHashKey@beanGetter)(attributeName = "UserId") @BeanProperty var userId: String,
                    @(DynamoDBRangeKey@beanGetter)(attributeName = "Timestamp") @BeanProperty var timestamp: Long,
                    @DynamoDBAttribute(attributeName = "Food") @BeanProperty var food: String,
                    @DynamoDBAttribute(attributeName = "Calorie") @BeanProperty var calorie: Int
                    ) {
  def this() = this(null, 0, null, 0)
}

object FoodLog extends DynamoDBClient {
  def readRecent(userId: String, limit: Int): Seq[FoodLog] = ???
}

 

テストデータ投入

REPL を使い、2ユーザx10件ずつのランダムなテストデータを投入する。

$ export AWS_ACCESS_KEY="xxxxxx"
$ export AWS_SECRET_KEY="xxxxxx"
$ sbt console

scala> import com.github.mogproject.example.dynamodb.FoodLog
import com.github.mogproject.example.dynamodb.FoodLog

scala> import scala.util.Random
import scala.util.Random

scala> val item1 = (1 to 10).map(i => FoodLog("user-1", Random.nextInt(100000), s"food-$i", Random.nextInt(2000)))
item1: scala.collection.immutable.IndexedSeq[com.github.mogproject.example.dynamodb.FoodLog] = Vector(FoodLog(user-1,78548,food-1,1911), FoodLog(user-1,67632,food-2,974), FoodLog(user-1,34756,food-3,1639), FoodLog(user-1,15595,food-4,937), FoodLog(user-1,77366,food-5,158), FoodLog(user-1,9615,food-6,393), FoodLog(user-1,64601,food-7,429), FoodLog(user-1,6847,food-8,1834), FoodLog(user-1,55271,food-9,1434), FoodLog(user-1,74394,food-10,885))

scala> val item2 = (1 to 10).map(i => FoodLog("user-2", Random.nextInt(100000), s"food-$i", Random.nextInt(2000)))
item2: scala.collection.>immutable.IndexedSeq[com.github.mogproject.example.dynamodb.FoodLog] = Vector(FoodLog(user-2,15618,food-1,1356), FoodLog(user-2,27456,food-2,123), FoodLog(user-2,62137,food-3,1122), FoodLog(user-2,43501,food-4,673), FoodLog(user-2,80906,food-5,577), FoodLog(user-2,96682,food-6,1112), FoodLog(user-2,40193,food-7,1961), FoodLog(user-2,44857,food-8,1064), FoodLog(user-2,88767,food-9,1618), FoodLog(user-2,42126,food-10,761))

scala> FoodLog.batchSave(item1 ++ item2: _*)
res0: java.util.List[com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper.FailedBatch] = []

 

クエリ部分(仮)実装

件数制限付きのクエリを素直に書くと、以下のようになる。

  def readRecent(userId: String, limit: Int): Seq[FoodLog] = {
    val query = new DynamoDBQueryExpression[FoodLog]()
      .withHashKeyValues(FoodLog(userId, 0, null, 0))
      .withScanIndexForward(false)
      .withLimit(limit)
      .withConsistentRead(false)
    mapper.query(classOf[FoodLog], query).asScala
  }

そして limit=5 としてクエリを実行すると、結果は …… 10個ある!?

$ sbt console
scala> import com.github.mogproject.example.dynamodb.FoodLog
import com.github.mogproject.example.dynamodb.FoodLog

scala> FoodLog.readRecent("user-1", 5)
res0: Seq[com.github.mogproject.example.dynamodb.FoodLog] = Buffer(FoodLog(user-1,78548,food-1,1911), FoodLog(user-1,77366,food-5,158), FoodLog(user-1,74394,food-10,885), FoodLog(user-1,67632,food-2,974), FoodLog(user-1,64601,food-7,429), FoodLog(user-1,55271,food-9,1434), FoodLog(user-1,34756,food-3,1639), FoodLog(user-1,15595,food-4,937), FoodLog(user-1,9615,food-6,393), FoodLog(user-1,6847,food-8,1834))

scala> res0.size
res1: Int = 10

scala> res0 foreach println
FoodLog(user-1,78548,food-1,1911)
FoodLog(user-1,77366,food-5,158)
FoodLog(user-1,74394,food-10,885)
FoodLog(user-1,67632,food-2,974)
FoodLog(user-1,64601,food-7,429)
FoodLog(user-1,55271,food-9,1434)
FoodLog(user-1,34756,food-3,1639)
FoodLog(user-1,15595,food-4,937)
FoodLog(user-1,9615,food-6,393)
FoodLog(user-1,6847,food-8,1834)

 

これはどういうことなのか

sbt run で実行可能なプログラムを作成し、build.sbt に以下の記述を行って http-wire ログを出力してみる。

javaOptions in run ++= Seq(
  "-Dorg.apache.commons.logging.Log=org.apache.commons.logging.impl.SimpleLog",
  "-Dorg.apache.commons.logging.simplelog.showdatetime=true",
  "-Dorg.apache.commons.logging.simplelog.log.org.apache.http.wire=DEBUG"
)

fork in run := true

すると、以下のように DynamoDB との通信が 2回発生していることがわかる。

[error] 2014/10/20 01:09:19:440 JST [DEBUG] wire - >> "POST / HTTP/1.1[\r][\n]"
[error] 2014/10/20 01:09:19:441 JST [DEBUG] wire - >> "Host: dynamodb.ap-northeast-1.amazonaws.com[\r][\n]"
(snip)
[error] 2014/10/20 01:09:19:442 JST [DEBUG] wire - >> "{"TableName":"FoodLog","Limit":5,"ConsistentRead":false,"KeyConditions":{"UserId":{"AttributeValueList":[{"S":"user-1"}],"ComparisonOperator":"EQ"}},"ScanIndexForward":false}"
[error] 2014/10/20 01:09:19:468 JST [DEBUG] wire - << "HTTP/1.1 200 OK[\r][\n]"
(snip)
[error] 2014/10/20 01:09:19:478 JST [DEBUG] wire - << "{"Count":5,"Items":[{"UserId":{"S":"user-1"},"Timestamp":{"N":"78548"},"food":{"S":"food-1"},"calorie":{"N":"1911"}},{"UserId":{"S":"user-1"},"Timestamp":{"N":"77366"},"food":{"S":"food-5"},"calorie":{"N":"158"}},{"UserId":{"S":"user-1"},"Timestamp":{"N":"74394"},"food":{"S":"food-10"},"calorie":{"N":"885"}},{"UserId":{"S":"user-1"},"Timestamp":{"N":"67632"},"food":{"S":"food-2"},"calorie":{"N":"974"}},{"UserId":{"S":"user-1"},"Timestamp":{"N":"64601"},"food":{"S":"food-7"},"calorie":{"N":"429"}}],"LastEvaluatedKey":{"Timestamp":{"N":"64601"},"UserId":{"S":"user-1"}},"ScannedCount":5}"
[info] FoodLog(user-1,78548,food-1,1911)
[info] FoodLog(user-1,77366,food-5,158)
[info] FoodLog(user-1,74394,food-10,885)
[info] FoodLog(user-1,67632,food-2,974)
[info] FoodLog(user-1,64601,food-7,429)
[error] 2014/10/20 01:09:19:504 JST [DEBUG] wire - >> "POST / HTTP/1.1[\r][\n]"
[error] 2014/10/20 01:09:19:504 JST [DEBUG] wire - >> "Host: dynamodb.ap-northeast-1.amazonaws.com[\r][\n]"
(snip)
[error] 2014/10/20 01:09:19:505 JST [DEBUG] wire - >> "{"TableName":"FoodLog","Limit":5,"ConsistentRead":false,"KeyConditions":{"UserId":{"AttributeValueList":[{"S":"user-1"}],"ComparisonOperator":"EQ"}},"ScanIndexForward":false,"ExclusiveStartKey":{"UserId":{"S":"user-1"},"Timestamp":{"N":"64601"}}}"
[error] 2014/10/20 01:09:19:533 JST [DEBUG] wire - << "HTTP/1.1 200 OK[\r][\n]"
(snip)
[error] 2014/10/20 01:09:19:533 JST [DEBUG] wire - << "{"Count":5,"Items":[{"UserId":{"S":"user-1"},"Timestamp":{"N":"55271"},"food":{"S":"food-9"},"calorie":{"N":"1434"}},{"UserId":{"S":"user-1"},"Timestamp":{"N":"34756"},"food":{"S":"food-3"},"calorie":{"N":"1639"}},{"UserId":{"S":"user-1"},"Timestamp":{"N":"15595"},"food":{"S":"food-4"},"calorie":{"N":"937"}},{"UserId":{"S":"user-1"},"Timestamp":{"N":"9615"},"food":{"S":"food-6"},"calorie":{"N":"393"}},{"UserId":{"S":"user-1"},"Timestamp":{"N":"6847"},"food":{"S":"food-8"},"calorie":{"N":"1834"}}],"ScannedCount":5}"
[info] FoodLog(user-1,55271,food-9,1434)
[info] FoodLog(user-1,34756,food-3,1639)
[info] FoodLog(user-1,15595,food-4,937)
[info] FoodLog(user-1,9615,food-6,393)
[info] FoodLog(user-1,6847,food-8,1834)

改めてAPIマニュアル(DynamoDBQueryExpression (AWS SDK for Java - 1.9.1))を読む。

Sets the maximum number of items to retrieve in each service request to DynamoDB and returns a pointer to this object for method-chaining.

Note that when calling DynamoDBMapper.query, multiple requests are made to DynamoDB if needed to retrieve the entire result set. Setting this will limit the number of items retrieved by each request, NOT the total number of results that will be retrieved. Use DynamoDBMapper.queryPage to retrieve a single page of items from DynamoDB.

つまるところ、withLimit で指定しているのはクエリ全体の取得件数ではなく、
1回のリクエストで取得するサイズ (サービスリクエストにおける1ページのサイズ) なのである。

DynamoDBMapper.query メソッドは(ページ単位で遅延評価となる)全体の結果セットを返すため、
その結果に map や size などの横断的な処理を適用すると結果セット全体がスキャンされてしまう。

求める結果が最初のページだけでよければ、DynamoDBMapper.queryPage を利用するのが正解だ。

 

クエリ部分の正しい実装

ハイライト部分を修正。

  def readRecent(userId: String, limit: Int): Seq[FoodLog] = {
    val query = new DynamoDBQueryExpression[FoodLog]()
      .withHashKeyValues(FoodLog(userId, 0, null, 0))
      .withScanIndexForward(false)
      .withLimit(limit)
      .withConsistentRead(false)
    mapper.queryPage(classOf[FoodLog], query).getResults.asScala
  }

結果は想定通り、5個のみとなった。

scala> import com.github.mogproject.example.dynamodb.FoodLog
import com.github.mogproject.example.dynamodb.FoodLog

scala> FoodLog.readRecent("user-1", 5)
res0: Seq[com.github.mogproject.example.dynamodb.FoodLog] = Buffer(FoodLog(user-1,78548,food-1,1911), FoodLog(user-1,77366,food-5,158), FoodLog(user-1,74394,food-10,885), FoodLog(user-1,67632,food-2,974), FoodLog(user-1,64601,food-7,429))

scala> res0.size
res1: Int = 5

scala> res0 foreach println
FoodLog(user-1,78548,food-1,1911)
FoodLog(user-1,77366,food-5,158)
FoodLog(user-1,74394,food-10,885)
FoodLog(user-1,67632,food-2,974)
FoodLog(user-1,64601,food-7,429)

DynamoDBのスキャンが想定外に繰り返されると、応答が遅くなるだけでなく
読み込みキャパシティの限界突破のリスクも非常に高くなる。
このような落とし穴にはよくよく注意が必要である。

 

さいごに、簡単なベンチマークを行った。

100件のデータを用意し、「limit=5 の全ページスキャン」「limit=5 の最初のページのみスキャン」「limit=100 の最初のページのみスキャン」の所要時間を測定したところ、結果は以下のようになった。

  • limit=5 の全ページスキャン                    : 340msec
  • limit=5 の最初のページのみスキャン      :  19msec
  • limit=100 の最初のページのみスキャン  :  28msec

やはり、DynamoDBに対してHTTP通信を繰り返す(上記の例では20回)のは非常にコストが高い。
全ページのスキャンを行うくらいなら、最初から limit を引き上げたほうが得策だろう。

 

 

Source code

 

References

0 件のコメント:

コメントを投稿