へろへろもへじ

(ブログタイトル募集中)

AWS SDK for PHP 2を利用してAmazon Kinesisを操作してみた

Amazon Kinesisについて調べて、ちょっと遊んでみたのでメモ。

Amazon Kinesisとは?

公式ページにある動画を見るとイメージしやすいかもです。
Amazon Kinesis - フルマネージド型大規模ストリーミングデータ処理サービス | アマゾン ウェブ サービス(AWS 日本語)

ちなみに2014/2/23時点で、選択できるリージョンはVirginiaのみです。

お値段

$0.015 /1 時間(1shard)
$0.028 /1,000,000レコード

安い!!!!!東京リージョンのEC2のt1.microですら$0.027 /1 時間なので、どれだけ安価かわかりますね。

登場人物

Stream

後述のData RecordとShardを管理する大元。

Data Record

Kinesisに格納されるデータの単位のこと。最大サイズ50Kバイトの制限あり。
各Data RecordにはSequence Number(一意)が割り当てられ、Shardに分散される。

Shard

Stream内で水平分割する仕組み。1Shardにつき、
READ:最大2M/sec、同時接続数5
WRITE:最大1M/sec、同時接続数1000
の制限あり。
オンラインでShardの増加が可能。
Shardは自身を管理するShard Idを持つ。
Shard内のData RecordのポジションのことをShard Iteratorという。(たぶんそんなイメージ...)
Shard内のData Recordの生存期間は24時間。

Partition Key

Data Recordが属するShardを決定するためのキー。最大256バイトUnicode文字列という制限あり。

Sequence Number

各Data Recordに自動で割り当てられる番号。時間の経過に比例して増加するが、putが立て続けに発生した場合、順序は正確にリクエストの順序に対応しない場合がある。厳密に保証したい場合、「SequenceNumberForOrdering」というパラメータを使用する必要がある。

Kinesisを利用するために

下記資料を元に、管理コンソール上からポチポチ1分くらいで完了です。
Step 1: Create an Amazon Kinesis Stream - Amazon Kinesis

AWS SDK for PHP 2でKinesisを操作してみる

以下のようなサンプルを作ってみました。

  1. 適当なデータを1秒間隔でputする
  2. 1.でputしたデータをgetして出力する

なお、AWS SDKはComposerでダウンロードしたバージョン2.5.2を利用しています。

■put.php(適当なデータを1秒間隔でputする)

<?php
require './vendor/autoload.php';

use Aws\Kinesis\KinesisClient;
use Aws\Common\Enum\Region;

$kinesis = KinesisClient::factory(array(
  'key' => 'your key',
  'secret' => 'your secret',
  'region' => Region::VIRGINIA
));

while (true) {
    $sample_data = date('YmdHis');

    $result = $kinesis->putRecord(array(
        'StreamName' => 'hoge-stream',
        'Data' => $sample_data,
        'PartitionKey' => 'kinesis-sample'
    ));

    echo "put data:$sample_data" . PHP_EOL;
    
    sleep(1);
}

■get.php(1.でputしたデータをgetして出力する)

<?php
require './vendor/autoload.php';

use Aws\Kinesis\KinesisClient;
use Aws\Common\Enum\Region;

$kinesis = KinesisClient::factory(array(
  'key' => 'your key',
  'secret' => 'your secret',
  'region' => Region::VIRGINIA
));

$shard_iterator_info = $kinesis->getShardIterator(array(
    'StreamName' => 'hoge-stream',
    'ShardId' => 'shardId-000000000000',
    'ShardIteratorType' => 'TRIM_HORIZON',
));

$shard_iterator = null;
while (true) {
    if (is_null($shard_iterator)) {
        $shard_iterator = $shard_iterator_info['ShardIterator'];
    } else {
        $shard_iterator = $records_info['NextShardIterator'];
    }
    
    $records_info = $kinesis->getRecords(array(
        'ShardIterator' => $shard_iterator,
        'Limit' => 100
    ));

    $records = $records_info['Records'];
    if (!empty($records)) {
        // Recordsは必ずしも取れるわけではない
        var_dump($records);
    }

    sleep(1);
}

こんな感じでできました。ちなみに、EC2上ではなく、ローカルからも接続できます。

所感など

今回作ったサンプルは全く実用的ではないです。実務で使うとなるとどこまで処理したのかを管理...つまりShard IdSequence Numberを管理する必要が出てくると思います。
で、、、上記はAmazon Kinesis Client Library(awslabs/amazon-kinesis-client · GitHub)というライブラリを利用すればこのへんの管理をいい感じにやってくれるみたい(裏でDynamoDBを使っている)なのですが、現状Javaしかサポートされていないので、Java以外の言語を利用する場合は独自に実装する必要があります。Java以外の言語もサポートされれば、安価で運用も楽そうなので、一気に流行りそう!!(例えば、EC2で運用しているFluentdのAgregatorをKinesisに代替するみたいな使い方ができそう)

ってことで、もうちょっとツッコんで色々試してみたいと思います。
Amazon Kinesis Client Libraryの実装見てみます)