12.31.2015

Python: Do Not Use Pool.map Method in multiprocessing Module

Python: multiprocessing モジュールの Pool.map を使ったときの罠

 

Pool.map を使った並行処理をバッチ処理などで実行した際、キーボードによる中断(KeyboardInterrupt)をすると
プロセスがハングアップすることがある。

コード
#!/usr/bin/env python

from multiprocessing import Pool, current_process
import time
from datetime import datetime

def f(x):
    print('[%s] start %d: %s' % (datetime.now(), x, current_process().name))
    time.sleep(5)
    print('[%s] end   %d: %s' % (datetime.now(), x, current_process().name))
    return x

pool = Pool(2)
ret = pool.map(f, range(4))
print('[%s] result: %s' % (datetime.now(), ret))
通常の実行例
[2015-12-31 22:10:02.319094] start 0: PoolWorker-1
[2015-12-31 22:10:02.319267] start 1: PoolWorker-2
[2015-12-31 22:10:07.320538] end   1: PoolWorker-2
[2015-12-31 22:10:07.320538] end   0: PoolWorker-1
[2015-12-31 22:10:07.321345] start 2: PoolWorker-2
[2015-12-31 22:10:07.321434] start 3: PoolWorker-1
[2015-12-31 22:10:12.321765] end   2: PoolWorker-2
[2015-12-31 22:10:12.321765] end   3: PoolWorker-1
[2015-12-31 22:10:12.322721] result: [0, 1, 2, 3]
Ctrl-C を押した場合
[2015-12-31 22:05:42.108238] start 0: PoolWorker-1
[2015-12-31 22:05:42.109395] start 1: PoolWorker-2
^CProcess PoolWorker-1:
Process PoolWorker-2:
Traceback (most recent call last):
Traceback (most recent call last):
(snip)
    return map(*args)
    time.sleep(5)
  File "./x.py", line 9, in f
KeyboardInterrupt
    time.sleep(5)
KeyboardInterrupt
[2015-12-31 22:05:42.631444] start 2: PoolWorker-3
[2015-12-31 22:05:42.632162] start 3: PoolWorker-4
[2015-12-31 22:05:47.632713] end   3: PoolWorker-4
[2015-12-31 22:05:47.632664] end   2: PoolWorker-3

プログラムが停止せず、kill コマンドでプロセスを終了する必要に迫られる。
何度 Ctrl-C を押しても駄目である。

回避策1

try 節で KeyboardInterrupt をトラップすれば、プログラムのハングアップは防げる。
しかし、中断した後もプログラムが続行してしまう。

#!/usr/bin/env python

from multiprocessing import Pool, current_process
import time
from datetime import datetime

def f(x):
    try:
        print('[%s] start %d: %s' % (datetime.now(), x, current_process().name))
        time.sleep(5)
        print('[%s] end   %d: %s' % (datetime.now(), x, current_process().name))
    except KeyboardInterrupt:
        pass
    return x

pool = Pool(2)
ret = pool.map(f, range(4))
print('[%s] result: %s' % (datetime.now(), ret))

実行例

[2015-12-31 22:21:41.647775] start 0: PoolWorker-1
[2015-12-31 22:21:41.647928] start 1: PoolWorker-2
^C[2015-12-31 22:21:42.716623] start 2: PoolWorker-2
[2015-12-31 22:21:42.716755] start 3: PoolWorker-1
[2015-12-31 22:21:47.717815] end   2: PoolWorker-2
[2015-12-31 22:21:47.717832] end   3: PoolWorker-1
Traceback (most recent call last):
(snip)
    waiter.acquire()
KeyboardInterrupt
回避策2

map の代わりに map_async を使えば、問題を回避できる。この場合は即座にプログラムが終了する。
ただし、結果を利用する際には get メソッドとともにタイムアウトの時間(秒数)を指定する必要がある。

#!/usr/bin/env python

from multiprocessing import Pool, current_process
import time
from datetime import datetime

def f(x):
    print('[%s] start %d: %s' % (datetime.now(), x, current_process().name))
    time.sleep(5)
    print('[%s] end   %d: %s' % (datetime.now(), x, current_process().name))
    return x

pool = Pool(2)
p = pool.map_async(f, range(4))
ret = p.get(86400)
print('[%s] result: %s' % (datetime.now(), ret))

実行例

[2015-12-31 22:24:32.482984] start 0: PoolWorker-1
[2015-12-31 22:24:32.483816] start 1: PoolWorker-2
^CTraceback (most recent call last):
(snip)
    time.sleep(5)
KeyboardInterrupt

 

References

12.21.2015

color-ssh: Runs Remote Commands, Colorfully!

color-ssh: カラフルなリモートコマンドの実行ツール

 

color-ssh というコマンドライン・ツールを作った。

 

インストール

# pip install color-ssh
  • 環境によっては「sudo」コマンドが必要
  • バージョンアップする場合は、「pip install --upgrade color-ssh」

 

「color-cat」と「color-ssh」という 2個のコマンドがインストールされる。
以下に、基本的な使い方を説明する。

$ color-cat --version
color-cat 0.1.0
$ color-ssh --version
color-ssh 0.1.0

 

color-cat

UNIX/Linux の cat コマンドにターミナル上で色を付けるだけのツール。

x

  • 引数なしで実行すれば、標準入力から読み取った文字列が、ターミナルに赤色で表示される。
  • 「-l」オプションでラベルを指定。各行の左側に反転色で表示される。
    • ラベルを変えれば色も変わる。
      ラベルの文字列によって、色が一意に決まるというのが最大の特徴。
  • 「-c」オプションで色を直接指定することも可能。
  • 「-s」オプションでセパレータ文字列も変えられる。
  • もちろん、通常の cat コマンド同様、引数にファイルパスを指定すれば、その内容を全て表示する。
  • 「color-cat --help」でヘルプを表示。

 

color-ssh

ssh コマンドの出力を色付きで表示するためのツール。
内部では color-cat コマンドが実行されている。

x

  • ssh コマンドを使ってリモートサーバ上でコマンドを実行し、その出力を色付きで表示する。
  • サーバの名前が color-cat のラベルとして利用される。そのため同じサーバは同じ色で出力される。
  • 標準エラー出力はセパレータが「+」に変わる。
  • 複数のサーバ上で、同じコマンドを同時に実行することが可能。(parallel-ssh と同じような機能)
    • ホスト名のリストは、「-h」オプションでファイルから読み込むか、「-H」オプションで文字列として指定する。
  • 「-p」オプションで多重度を制限可能。
  • 「color-ssh --help」でヘルプを表示。

 

コマンドライン引数の「分配」

「--distribute」オプションを指定すれば、引数を各サーバに分割・分配して実行することができる。
(これをやりたかった)

x

上記の例では、「a」「b」「c」「d」「e」という 5個の引数を、各サーバにほぼ均等になるように割り当てて、そのコマンドを並行実行する。

server-1 では「echo xxx a b c」
server-2 では「echo xxx d e」

というコマンドが同時に実行されている。

この機能は、多数の入力ファイルと複数のサーバがある場合、手軽に仕事を分割するのに役立つと思う。

12.18.2015

How to Handle Binary-data stdin/stdout and Command-line Arguments in Python3

Python: 標準入出力およびコマンドライン引数でバイナリデータを取り扱う方法

 

Python2 の場合

str = bytes なので、普通に書けばバイナリデータにも対応できる。

import sys

sys.stdout.write('### sys.argv\n')
for arg in sys.argv:
    sys.stdout.write(arg)
    sys.stdout.write('\n')

sys.stderr.write('### sys.stdin\n')
for line in iter(sys.stdin.readline, ''):
    sys.stderr.write(line.rstrip())
    sys.stderr.write('\n')
  • 実行例 (?? の箇所は表示非対応)
$ echo $'abc\nあいう\n\xff\xfe' | python2 ./bin_stdin.py abc あいう $'\xff\xfe'
### sys.argv
./bin_stdin.py
abc
あいう
??
### sys.stdin
abc
あいう
??

 

Python3 の場合

上記のコードはエンコードエラーになる。
これは、Python3 の sys.stdout.write が str = unicode を引数に取るため。

$ echo $'abc\nあいう\n\xff\xfe' | python3 ./bin_stdin.py abc あいう $'\xff\xfe'
### sys.argv
./bin_stdin.py
abc
あいう
Traceback (most recent call last):
  File "./bin_stdin.py", line 5, in
    sys.stdout.write(arg)
UnicodeEncodeError: 'utf-8' codec can't encode character '\udcff' in position 0: surrogates not allowed

 

ポイント
  • 1. sys.stdXXX.buffer を利用する
  • 2. sys.args の要素を os.fsencode でエンコードする
    • ただしOptionParserなどのパース処理はエンコード前に適用しないと正しく処理されない模様

以下のようなコードを書けば、Python2/3 両方に対応できる。 (flush は必ずしも必須ではない)

import sys
import os

PY3 = sys.version_info >= (3, )

stdin = sys.stdin.buffer if PY3 else sys.stdin
stdout = sys.stdout.buffer if PY3 else sys.stdout
stderr = sys.stderr.buffer if PY3 else sys.stderr

stdout.write(b'### sys.argv\n')
for arg in sys.argv:
    stdout.write(os.fsencode(arg) if PY3 else arg)
    stdout.write(b'\n')
    stdout.flush()

stderr.write(b'### sys.stdin\n')
for line in iter(stdin.readline, b''):
    stderr.write(line.rstrip())
    stderr.write(b'\n')
    stderr.flush()
  • 実行例
$ echo $'abc\nあいう\n\xff\xfe' | python3 ./bin_stdin.py abc あいう $'\xff\xfe'
### sys.argv
./bin_stdin.py
abc
あいう
??
### sys.stdin
abc
あいう
??

12.17.2015

How to Color the Output from SSH Commands

SSH の出力をカラフルにする方法

 

SSH に限ったことではないが、ターミナルに表示されるテキストを目的に応じて着色したい場合、
パイプラインと sed を使うのが一番簡単だ。

$ ssh server-1 'python -c "import this"' \
  | sed -e $'s/^\\(.*\\)$/\e[96mserver-1\e[0m|\e[36m\\1\e[0m/'

出力は以下のようになる。
x

\e[96m や \e[36m の部分を変更すれば、色が変わる。

この応用として、複数のサーバに対して SSH をバックグラウンドで実行し(コマンドラインの末尾に「&」を付ける)
最後に wait コマンドで処理を待つようなシェルを書けば、サーバに応じた色のアウトプットをリアルタイムに観察できる。
y

12.16.2015

Manipulating Many Servers by Using parallel-ssh

parallel-ssh を使って複数サーバを同時に操作する

 

tl; dr

  • 2台〜100台程度のサーバに対して、同じオペレーションを同時に実行したい
  • parallel-ssh (pssh) の各種コマンドを使うと便利
  • ただし、リモートサーバで sudo が必要な場合は Ansible 等の他ツールが必要

 

parallel-ssh のセットアップ

 

インストール

parallel-ssh - PSSH: Parallel SSH Tools - Google Project Hosting
  • Mac
    $ brew install pssh
  • RHEL系
    # yum install pssh
ホストリストの準備
  • 任意のパスにテキストファイルを作成
    server-1
    server-2
    server-3 root
  • 1行ずつ、接続先サーバのホスト名またはIPアドレスを記述
  • ログインユーザの指定も可能

 

目的別コマンド実行例

 

SSH 認証のための設定

parallel-ssh を使うには、各サーバに SSH で接続できることが前提となる。
ssh-copy-id コマンドを利用して、~/.ssh/id_rsa の内容を登録する場合は以下。

$ for h in $(cat ~/hosts); do ssh-copy-id $h; done

 

pssh の使い方
  • 基本形
    $ pssh -h ~/hosts -i 'hostname'
    [1] 04:09:13 [SUCCESS] server-1
    server-1
    [2] 04:09:13 [SUCCESS] server-2
    server-2
  • ホスト名を直接指定
    $ pssh -H server-1 -H server-2 -i 'hostname'
    [1] 04:09:13 [SUCCESS] server-1
    server-1
    [2] 04:09:13 [SUCCESS] server-2
    server-2
  • タイムアウトを無制限に
    $ pssh -h ~/hosts -t 0 -i 'hostname'
    [1] 04:09:13 [SUCCESS] server-1
    server-1
    [2] 04:09:13 [SUCCESS] server-2
    server-2
  • アウトプットをすぐに表示
    $ pssh -h ~/hosts -P 'hostname'
    server-1: server-1
    [1] 04:13:28 [SUCCESS] server-1
    server-2: server-2
    [2] 04:13:28 [SUCCESS] server-2
    
  • リモートでバックグラウンド実行
    $ pssh -h ~/hosts -i 'nohup your_command > /path/to/output 2>&1 </dev/null &'
    [1] 04:13:28 [SUCCESS] server-1
    [2] 04:13:28 [SUCCESS] server-2
    
    バッファの影響で、標準出力の出力先ファイルがリアルタイムに更新されない場合がある。
    対応にはハックが必要: linux - redirect nohup stdout and flush - Stack Overflow

 

ファイル転送
  • ローカルからリモートに転送
    $ prsync -h ~/hosts -av /data/input/ /data/input/
    • パスの末尾にスラッシュを付けると、ディレクトリ間の同期となる。(rsync の仕様)
    • -z (圧縮オプション) は付けると CPU がボトルネックになって大幅に性能が劣化する場合がある。ナローバンドでなければ、付けないほうが良さそうだ。
  • リモートからローカルに転送
    $ pssh -h ~/hosts -p 4 -i 'rsync -e "ssh -o StrictHostKeyChecking=no" -av \
    /data/output/ server-1:/data/output/'
    • 全サーバから server-1 に集約する例
    • -p オプションを指定して、並列度を制限している。
      こうしないと、コネクションが多すぎて接続できない場合がある。
    • rsync -e "ssh -o ..." という書式で SSH オプションを指定。
      ここではホストキーのチェックを回避している。

 

リモートサーバで sudo が必要な場合
  • Ansible で代用
    $ ansible -i ~/hosts all --sudo -K \
    -a 'bash -c "cd /path/to/your/app && make install"'
    

 

References

12.08.2015

Python: How to Execute Parallel Processing using multiprocessing.Pool

Python: multiprocessing.Pool を使った並列処理の実行

 

Python で並列処理を行う場合、自分でスレッドを書くこともできるが、グローバルインタプリタロック (GIL) の制約を受けて意図しない結果に陥ることが少なくない。

multithreading モジュールの Pool を利用するのが最も簡単かつ安全である。

 

基本形

引数を1個取る関数を定義し、Pool#map を実行する。

import time
from datetime import datetime
from multiprocessing import Pool, current_process

def f(x):
    proc_name = current_process().name
    print('(%s) [%s] Started: x=%d' % (proc_name, datetime.now(), x))
    time.sleep(1)
    print('(%s) [%s] Ended  : x=%d' % (proc_name, datetime.now(), x))
    return x * x

pool = Pool(4)
print(pool.map(f, range(8)))

尚、Python 3.4 以降なら with 句を使った表現もできる。

実行結果は以下のようになるはずだ。

(PoolWorker-1) [2015-12-08 01:49:03.666114] Started: x=0
(PoolWorker-2) [2015-12-08 01:49:03.666246] Started: x=1
(PoolWorker-3) [2015-12-08 01:49:03.666400] Started: x=2
(PoolWorker-4) [2015-12-08 01:49:03.666620] Started: x=3
(PoolWorker-1) [2015-12-08 01:49:04.667421] Ended  : x=0
(PoolWorker-3) [2015-12-08 01:49:04.667421] Ended  : x=2
(PoolWorker-2) [2015-12-08 01:49:04.667423] Ended  : x=1
(PoolWorker-4) [2015-12-08 01:49:04.667511] Ended  : x=3
(PoolWorker-3) [2015-12-08 01:49:04.668070] Started: x=4
(PoolWorker-2) [2015-12-08 01:49:04.668151] Started: x=5
(PoolWorker-1) [2015-12-08 01:49:04.668282] Started: x=6
(PoolWorker-4) [2015-12-08 01:49:04.668382] Started: x=7
(PoolWorker-3) [2015-12-08 01:49:05.669206] Ended  : x=4
(PoolWorker-2) [2015-12-08 01:49:05.669206] Ended  : x=5
(PoolWorker-4) [2015-12-08 01:49:05.669218] Ended  : x=7
(PoolWorker-1) [2015-12-08 01:49:05.669230] Ended  : x=6
[0, 1, 4, 9, 16, 25, 36, 49]

実行中はプロセスがフォークされているので、特にメモリ使用量は気にかける必要がある。

 

複数のパラメータを取る

タプルを渡して、ワーカー側で分解するのが常套手段。

import time
from datetime import datetime
from multiprocessing import Pool, current_process

def f(args):
    x, y = args
    proc_name = current_process().name
    print('(%s) [%s] Started: x=%d, y=%d' % (proc_name, datetime.now(), x, y))
    time.sleep(1)
    print('(%s) [%s] Ended  : x=%d, y=%d' % (proc_name, datetime.now(), x, y))
    return x * y

pool = Pool(4)
print(pool.map(f, zip(range(8), range(10, 18))))

実行例

(PoolWorker-1) [2015-12-08 01:55:01.700005] Started: x=0, y=10
(PoolWorker-2) [2015-12-08 01:55:01.700078] Started: x=1, y=11
(PoolWorker-3) [2015-12-08 01:55:01.700196] Started: x=2, y=12
(PoolWorker-4) [2015-12-08 01:55:01.700463] Started: x=3, y=13
(PoolWorker-3) [2015-12-08 01:55:02.701283] Ended  : x=2, y=12
(PoolWorker-2) [2015-12-08 01:55:02.701275] Ended  : x=1, y=11
(PoolWorker-1) [2015-12-08 01:55:02.701261] Ended  : x=0, y=10
(PoolWorker-4) [2015-12-08 01:55:02.701293] Ended  : x=3, y=13
(PoolWorker-3) [2015-12-08 01:55:02.701914] Started: x=4, y=14
(PoolWorker-1) [2015-12-08 01:55:02.702046] Started: x=5, y=15
(PoolWorker-2) [2015-12-08 01:55:02.702165] Started: x=6, y=16
(PoolWorker-4) [2015-12-08 01:55:02.702361] Started: x=7, y=17
(PoolWorker-1) [2015-12-08 01:55:03.703123] Ended  : x=5, y=15
(PoolWorker-3) [2015-12-08 01:55:03.703106] Ended  : x=4, y=14
(PoolWorker-4) [2015-12-08 01:55:03.703131] Ended  : x=7, y=17
(PoolWorker-2) [2015-12-08 01:55:03.703140] Ended  : x=6, y=16
[0, 11, 24, 39, 56, 75, 96, 119]

 

ワーカーのスタックトレースを出力する

何らかの例外が発生する場合を考える。

import time
from datetime import datetime
from multiprocessing import Pool, current_process

def f(args):
    x, y = args
    proc_name = current_process().name
    print('(%s) [%s] Started: x=%d, y=%d' % (proc_name, datetime.now(), x, y))
    time.sleep(1)
    print('(%s) [%s] Ended  : x=%d, y=%d' % (proc_name, datetime.now(), x, y))
    return y / x

pool = Pool(4)
print(pool.map(f, zip(range(8), range(10, 18))))

そのままだと、スタックトレースの内容が分かりづらい。

(PoolWorker-1) [2015-12-08 01:59:54.088732] Started: x=0, y=10
(PoolWorker-2) [2015-12-08 01:59:54.088835] Started: x=1, y=11
(PoolWorker-3) [2015-12-08 01:59:54.088960] Started: x=2, y=12
(PoolWorker-4) [2015-12-08 01:59:54.089304] Started: x=3, y=13
(PoolWorker-3) [2015-12-08 01:59:55.090067] Ended  : x=2, y=12
(PoolWorker-2) [2015-12-08 01:59:55.090095] Ended  : x=1, y=11
(PoolWorker-1) [2015-12-08 01:59:55.090049] Ended  : x=0, y=10
(PoolWorker-4) [2015-12-08 01:59:55.090160] Ended  : x=3, y=13
(PoolWorker-2) [2015-12-08 01:59:55.090750] Started: x=4, y=14
(PoolWorker-3) [2015-12-08 01:59:55.090926] Started: x=5, y=15
(PoolWorker-4) [2015-12-08 01:59:55.091020] Started: x=6, y=16
(PoolWorker-1) [2015-12-08 01:59:55.091336] Started: x=7, y=17
Traceback (most recent call last):
  File "xxx.py", line 14, in
    print(pool.map(f, zip(range(8), range(10, 18))))
  File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
ZeroDivisionError: integer division or modulo by zero

デコレータでスタックトレースを表示してみる。副産物として、関数 f が複数のパラメータを取れるようになった。

import time
from datetime import datetime
from multiprocessing import Pool, current_process
import traceback

def with_stacktrace(func):
    import functools

    @functools.wraps(func)
    def wrapper(args):
        try:
            return func(*args)
        except:
            proc_name = current_process().name
            for line in traceback.format_exc().splitlines():
                print('[TRACE:%s] %s' % (proc_name, line))
            raise
    return wrapper

@with_stacktrace
def f(x, y):
    proc_name = current_process().name
    print('(%s) [%s] Started: x=%d, y=%d' % (proc_name, datetime.now(), x, y))
    time.sleep(1)
    print('(%s) [%s] Ended  : x=%d, y=%d' % (proc_name, datetime.now(), x, y))
    return y / x

pool = Pool(4)
print(pool.map(f, zip(range(8), range(10, 18))))

実行例

(PoolWorker-1) [2015-12-08 02:31:36.959575] Started: x=0, y=10
(PoolWorker-2) [2015-12-08 02:31:36.959657] Started: x=1, y=11
(PoolWorker-3) [2015-12-08 02:31:36.959721] Started: x=2, y=12
(PoolWorker-4) [2015-12-08 02:31:36.960174] Started: x=3, y=13
(PoolWorker-4) [2015-12-08 02:31:37.960911] Ended  : x=3, y=13
(PoolWorker-2) [2015-12-08 02:31:37.960904] Ended  : x=1, y=11
(PoolWorker-3) [2015-12-08 02:31:37.960920] Ended  : x=2, y=12
(PoolWorker-1) [2015-12-08 02:31:37.960888] Ended  : x=0, y=10
(PoolWorker-2) [2015-12-08 02:31:37.961423] Started: x=4, y=14
(PoolWorker-3) [2015-12-08 02:31:37.961634] Started: x=5, y=15
(PoolWorker-4) [2015-12-08 02:31:37.961724] Started: x=6, y=16
[TRACE:PoolWorker-1] Traceback (most recent call last):
[TRACE:PoolWorker-1]   File "xxx.py", line 9, in f
[TRACE:PoolWorker-1]     return func(*args)
[TRACE:PoolWorker-1]   File "xxx.py", line 23, in f
[TRACE:PoolWorker-1]     return y / x
[TRACE:PoolWorker-1] ZeroDivisionError: integer division or modulo by zero
(PoolWorker-1) [2015-12-08 02:31:37.962384] Started: x=7, y=17
Traceback (most recent call last):
  File "xxx.py", line 26, in
    print(pool.map(f, zip(range(8), range(10, 18))))
  File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
ZeroDivisionError: integer division or modulo by zero

これでデバッグが捗る。

 

 

References

11.30.2015

C++/Python: Writing Custom Converters in Boost.Python

C++/Python: Boost.Python でカスタムコンバーターを作る

 

C++ のクラスを Python で扱うとき、std::pair などのクラスはデフォルトでは型変換が行われない。自分でコンバーターを書く必要がある。

 

以下はコードの例。C++11のラムダ式を利用したら綺麗にまとまった。

#include <boost/python.hpp>
#include "your_code.hpp"


namespace yourmodulename {
  namespace py = boost::python;

  /*
   * Converter for result set (std::vector of std::pair => pylist of pytuple)
   */
  namespace detail {
    template <typename T, typename U>
    struct pair_vector_to_tuple_list {
      static PyObject* convert(std::vector<std::pair<T, U> > const& ps) {
        py::list ret;
        for (auto it: ps) ret.append(py::make_tuple(it.first, it.second));
        return py::incref(ret.ptr());
      }

      static PyTypeObject const* get_pytype() { return &PyList_Type; }
    };
  }

  template <typename T, typename U>
  void expose_pair_vector_to_tuple_list() {
    py::to_python_converter<std::vector<std::pair<T, U> >, detail::pair_vector_to_tuple_list<T, U>, true>();
  }

  /*
   * Converter for list parameter (pylist => std::vector)
   */
  template <typename T>
  void expose_pylist_to_vector() {
    typedef std::vector<T> VT;

    auto convertible = [](PyObject* obj_ptr) -> void* { return PySequence_Check(obj_ptr) ? obj_ptr : NULL; };

    auto construct = [](PyObject* obj_ptr, py::converter::rvalue_from_python_stage1_data* data) {
      VT* storage = new (reinterpret_cast<py::converter::rvalue_from_python_storage<VT>*>(data)->storage.bytes) VT();
      for (py::ssize_t i = 0, l = PySequence_Size(obj_ptr); i < l; ++i) {
        storage->push_back(py::extract<typename boost::range_value<VT>::type>(PySequence_GetItem(obj_ptr, i)));
      }
      data->convertible = storage;
    };

    py::converter::registry::push_back(convertible, construct, py::type_id<VT>());
  }

}


BOOST_PYTHON_MODULE(yourmodulename) {
  using namespace boost::python;
  using namespace yourmodulename;

  // expose converters
  expose_pair_vector_to_tuple_list<int, double>();
  expose_pylist_to_vector<int>();

  // expose classes
  ...
}

なかなかに魔力を吸い取られそうなコードだ。

 

 

Related Posts

 

References

11.27.2015

Top N Sorter in Python

Python: トップNソートの実装

 

ソート可能な iterable に対して、降順の上位 N件のソート結果を取得したい。

heapq を使えば効率が良さそうだ。

 

コード

import heapq


class TopNSorter(object):
    def __init__(self, limit):
        self._heap = []
        self.limit = limit

    def insert(self, item):
        if len(self._heap) == self.limit:
            heapq.heappushpop(self._heap, item)
        else:
            heapq.heappush(self._heap, item)

    def result(self):
        h = self._heap[:]
        buf = []
        while h:
            buf.insert(0, heapq.heappop(h))
        return buf

 

実行例

>>> s = TopNSorter(5)
>>> s.result()
[]
>>> s.insert(10)
>>> s.result()
[10]
>>> s.insert(20)
>>> s.result()
[20, 10]
>>> s.insert(10)
>>> s.result()
[20, 10, 10]
>>> s.insert(5)
>>> s.result()
[20, 10, 10, 5]
>>> s.insert(30)
>>> s.result()
[30, 20, 10, 10, 5]
>>> s.insert(15)
>>> s.result()
[30, 20, 15, 10, 10]
>>> s.insert(10)
>>> s.result()
[30, 20, 15, 10, 10]
>>> s.insert(25)
>>> s.result()
[30, 25, 20, 15, 10]
>>> s.insert(1)
>>> s.result()
[30, 25, 20, 15, 10]

 

Related Posts

11.22.2015

Finding Community Structure in a Big Graph

C++: ビッググラフにおけるコミュニティ抽出のメモ

 

手法

ネットワークにおけるコミュニティ抽出 (グラフのクラスタリング) にはいくつかの手法がある。

こちらを参照 => R seminar on igraph

スパースなグラフに対する Non-overlapping で最も高速なのは、貪欲アルゴリズムを用いた手法 (O(n (log n)^2))。
今回はこちらを採用。

 

実装

R であれば igraph – Network analysis software に既に実装がある。

Python の NetworkX では過去に議論された形跡が見られるものの、取り込まれる見込みは薄そうだ。

GraphX には、もちろん無い。

今回はとにかく消費メモリを節約したかったので、論文作者本人が公開している C++ の実装を使うことにする。

 

ビルド

上記の URL で公開されているソースコードをビルドする。(ライセンスが GPL なので注意)

コンパイルオプションなどが今日のコンパイラに追随していなかったので、若干の修正を要した。

  • Makefile (-fforce-mem オプション削除)
@@ -8,7 +8,7 @@

 # compiler name and flags
 CCC = g++
-CCFLAGS = -O3 -fomit-frame-pointer -funroll-loops -fforce-mem -fforce-addr -fexpensive-optimizations
+CCFLAGS = -O3 -fomit-frame-pointer -funroll-loops -fforce-addr -fexpensive-optimizations

 # loader flags
 LDFLAGS =
  • fastcommunity_mh.cc (iostream.h -> iostream, string.h 追加)
@@ -71,12 +71,13 @@
 //
 ////////////////////////////////////////////////////////////////////////

-#include <iostream.h>
+#include <iostream>
 #include <fstream>
 #include <string>
 #include "stdlib.h"
 #include "time.h"
 #include "math.h"
+#include "string.h"

 #include "maxheap.h"
 #include "vektor.h"
  • maxheap.h (iostream.h -> iostream, namespace std 追加)
@@ -40,7 +40,8 @@
 #if !defined(MAXHEAP_INCLUDED)
 #define MAXHEAP_INCLUDED

-#include <iostream.h>
+#include <iostream>
+using namespace std;

 /*   Because using this heap requires us to be able to modify an arbitrary element's
        data in constant O(1) time, I use to tricky tactic of having elements in an array-
  • vektor.h (iostream.h -> iostream)
@@ -30,7 +30,7 @@
 #if !defined(vektor_INCLUDED)
 #define vektor_INCLUDED

-#include <iostream.h>
+#include <iostream>

 #if !defined(vektor_INCLUDED)
 #define vektor_INCLUDED

その後、ディレクトリ内で make コマンドを行えば、実行ファイル FastCommunityMH が生成される。

 

実行

まずは有名(?)な空手クラブの派閥争いのデータで動作確認してみる。

$ curl -O http://tuvalu.santafe.edu/~aaronc/hierarchy/karate.pairs
$ ./FastCommunity_GPL_v1.0.3/FastCommunityMH -f ./karate.pairs -l firstRun

以下の 2ファイルが成果物としてカレントディレクトリに出力される。

FASTCOMMUNITY_INFERENCE_ALGORITHM
START-----:     Sun Nov 22 01:54:13 2015
---FILES--------
D_IN------:     ./
D_OUT-----:     ./
F_IN------:     karate.pairs
F_JOINS---:     karate-fc_firstRun.joins
F_INFO----:     karate-fc_firstRun.info
---NET_STATS----
MAXID-----:     34
NUMNODES--:     34
NUMEDGES--:     78
---MODULARITY---
MAXQ------:     0.383136
STEP------:     32
EXIT------:     Sun Nov 22 01:54:13 2015
-1      -1      -0.0498028      0
17      6       -0.0376397      1
6       7       -0.0139711      2
7       1       -0.00147929     3
5       1       0.0177515       4
11      1       0.0490631       5
27      30      0.0612262       6
30      34      0.0784845       7
24      34      0.0946746       8
28      34      0.111111        9
26      25      0.123192        10
25      32      0.145874        11
13      4       0.157709        12
22      2       0.16905 13
31      9       0.180227        14
9       33      0.196992        15
10      3       0.208169        16
18      2       0.219181        17
12      1       0.229372        18
8       4       0.239563        19
4       3       0.253369        20
14      3       0.269149        21
2       3       0.289448        22
29      32      0.29931 23
32      34      0.311144        24
23      33      0.320513        25
19      33      0.329553        26
21      33      0.338264        27
33      34      0.349359        28
16      34      0.362837        29
15      34      0.375986        30
1       20      0.380671        31
20      3       0.383136        32

この *.joins ファイルを追っていけば、Q値の推移、検出されたクラスタの状態を把握することができる。

-c オプション(特定ステップ時点の状態をファイルに出力)や -v --v オプション(詳細表示)も約に立つが、その度に全データの再計算が行われてしまうので、ビッググラフでは扱いづらい。

ノード数が増えたら、*.joins ファイルを解析するためのツールを作る必要があるだろう。

 

ビッググラフへの適用

300万ノード、1300万エッジのグラフに対して、上記のプログラムを実行してみた。

期待どおりメモリ使用量は 10GB 以内とかなり少なく抑えられている。

しかし、並列処理には向いていないアルゴリズムなので、それなりの実行時間はかかりそう。
おそらく、完了までは10日程度を要するであろう。

11.11.2015

Python: How to Avoid 'multiprocessing' TypeError on Exit

Python: multiprocessing TypeError の回避方法

 

Python 2.6 でテスト (python setup.py test) をしたとき、最後に以下のようなエラーが出ることがある。

Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/opt/python/2.6.9/lib/python2.6/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/opt/python/2.6.9/lib/python2.6/multiprocessing/util.py", line 258, in _exit_function
    info('process shutting down')
TypeError: 'NoneType' object is not callable

回避策は、以下に従い setup.py に記述をすること。

try:
    # Work around a traceback on Python < 2.7.4 and < 3.3.1 
    # http://bugs.python.org/issue15881#msg170215
    import multiprocessing  # noqa: unused
except ImportError:
    pass

コメントに noqa を付けると、pep8 などのチェックを回避できる。

11.08.2015

Universal Use of 'subprocess' Module in Python

Python: バージョン・OS 透過的な subprocess モジュールの利用

 

Python のバージョン (2.6 / 2.7 / 3.2 / 3.3 / 3.4 / 3.5) や、OS環境 (Linux / Mac / Windows /CygWin) に依存せずに外部コマンドを実行するコードを書きたい。

 

ワークアラウンド

なかなか一筋縄ではいかない。

Python3.2 + POSIX環境 では、コマンドラインを bytes で渡してはいけない

以下のようなエラーが出る。

>>> import subprocess
>>> subprocess.call(b'echo')
TypeError: expect bytes or str, not int

subprocess モジュールの中で、args が文字列かどうかを判定するときに bytes である場合の考慮が不足しており、bytes の個々の要素(int)に対して分割が行われてしまうため。

対策は、コマンドラインをリストとして渡せばよい。以下のように対策する。(args が bytes である場合)

import sys

if sys.version_info[:2] == (3, 2):
    args = [args]

 

Python3 + Windows環境では、コマンドラインと環境変数のキー・値を bytes で渡してはいけない

以下のようなエラーが出る。

>>> import subprocess
>>> subprocess.call(b'cmd /C echo x')
    needquote = (" " in arg) or ("\t" in arg) or not arg
TypeError: argument of type 'int' is not iterable

bytes に対する in オペレーションに失敗しているが、これといった対策が見当たらない。bytes で渡すのをやめて、str で渡すことにする。(その場合、コマンドライン文字列のエンコードは指定できない)

import sys
import six
if six.PY2 or sys.platform != 'win32':
    # この場合だけ bytes にエンコード

env の指定についても同様。キーだけではなく、値も bytes が許容されていないので注意。

 

POSIX環境 + shell=True のとき、args をリストとして渡してはいけない

リストの2番目以降の引数が評価されない。

>>> import subprocess
>>> subprocess.call(['echo', '3'], shell=True)

0

ワークアラウンドとしては、subprocess.list2cmdline を使って適切な文字列を生成する。

import sys
import subprocess
if shell and sys.platform != 'win32':
    args = [subprocess.list2cmdline(args)]

 

外部コマンドについて

 

sleep

Windows 7 以降は「timeout」というコマンドが標準装備されているが、Python から実行すると標準入力が奪われてしまい不都合だった。


結局、Python の time モジュールを使うのが一番簡便だという結論に。

import subprocess
subprocess.call('python -c "import time;time.sleep(2)"')

 

プロセステーブルの取得

11.04.2015

Python: Terminal I/O Assessment in CygWin

Python: CygWin 環境におけるターミナル I/O 周りの調査

環境

  • OS: Windows Server 2012 R2 日本語版
  • CygWin: 2.873 (64bit)
  • Python: 2.7

 

目的

Python 上で以下の操作が可能か調べる。

  • ターミナルのエンコーディングの自動判別
  • ターミナル画面のクリア
  • ターミナル上のキー入力の検知

 

バリエーション

調査したバリエーションは以下 5 種類。

環境フロントエンドシェルPythonビルドエンコーディング
1 cmd.exe - Windows cp932
2 cmd.exe CygWin bash Windows cp932
3 cmd.exe CygWin bash CygWin UTF-8
4 MinTTY CygWin bash Windows UTF-8
5 MinTTY CygWin bash CygWin UTF-8

 

環境情報

いくつかの方法で情報を取得。(要 import platform,os,sys,locale)

#メソッド環境1環境2環境3環境4環境5
a platform.system() Windows Windows CYGWIN_NT-6.3 Windows CYGWIN_NT-6.3
b os.name nt nt posix nt posix
c os.environ.get('TERM') None cygwin cygwin xterm xterm
d os.environ.get('LANG') None ja_JP.UTF-8 ja_JP.UTF-8 ja_JP.UTF-8 ja_JP.UTF-8
e sys.stdin.isatty() True True True False True
f sys.stdin.encoding cp932 cp932 UTF-8 None UTF-8
g sys.stdout.isatty() True True True False True
h sys.stdout.encoding cp932 cp932 UTF-8 None UTF-8
i locale.getpreferredencoding() cp932 cp932 UTF-8 cp932 UTF-8

明快な判定方法は無いが、sys.stdout.encoding -> 環境変数LANG -> locale.getpreferredencoding() の順に信頼すれば良さそうに思える。

 

画面クリア

以下のメソッドの挙動を確認。(要 import subprocess)

#メソッド環境1環境2環境3環境4環境5
a subprocess.call('cls', shell=True) OK OK - - -
b subprocess.call('cmd /c cls', shell=True) - - OK - -
c subprocess.call('clear', shell=False) - - - - -
d subprocess.call(['echo', '-en', r'\ec'], shell=False) - - OK - OK
e subprocess.call(r'echo -en "\ec"', shell=False) - OK - OK -

なかなか判定条件が悩ましい。

 

キー入力の取得

msvcrt による取得が可能かどうかと、tty、curses が使えるかどうか調査。

#メソッド環境1環境2環境3環境4環境5
a msvcrt.getch() OK OK - - -
b tty.setraw(0) - - OK - OK
b curses.screen#getch() - - OK - OK

環境4 に限っては、1文字単位でキー入力を取得する手段が無いという結論となった。

 

11.03.2015

How to Clear Terminal Screen in Various Systems

ターミナルの画面を消去する方法

 

特別なインストールなしで実現したい。

  • POSIX互換環境(Linux/Unix/Mac): clear
  • Windows
    • コマンドプロンプト (cmd.exe): cls
    • CygWin (2011以前): cmd /c cls
    • CygWin (mintty): echo -en "\ec"

CygWin の闇は深い。

 

 

References

11.02.2015

calendar-cli - Command-line Interface for Google Calendar

calendar-cli - Googleカレンダーを操作するためのコマンドライン・インターフェース

 

Google カレンダーをコマンドラインで操作するニーズがあったので、元々あったスクリプトを CLI として作り変えた。

これを使えば、コマンドラインで

  • 1日単位のサマリーの表示
  • 予定の登録

が、できる。

 

コード

 

セットアップ

Google の API を使う都合上、導入までに少し手間がかかる。

1. API プロジェクトの作成とclient_secret.json の入手
  • Google Developers Console を開く (要Googleアカウントのログイン)
  • 新規プロジェクトを作成
  • Calendar API を有効化
    • API と認証 -> API -> Google Apps API -> Calendar API -> API を有効にする
  • OAuth 2.0 認証のクライアントを作成 (デバイス種別は Other)
    • API と認証 -> 認証情報 -> OAuth 同意画面 -> サービス名 (calendar-cli でよい) を入力し保存
    • API と認証 -> 認証情報 -> 認証情報を追加 -> OAuth 2.0 クライアント ID
      -> アプリケーションの種類: その他として保存
  • 画面から client_secret.json をダウンロードし保存
    • 実際のファイル名はもっと長い可能性あり

認証情報 calendar cli

 

2. calendar-cli のインストール

pip で導入可能。環境によっては要sudo。

$ pip install calendar-cli
$ calendar-cli --version
calendar-cli 0.1.3
$ calendar-cli -h
Usage:
  calendar-cli [options]
                        Print a summary of events on the calendar.

  calendar-cli setup  [--read-only --credential ]
                        Generate a credentials file from the client secret.
                        You need a web browser to proceed.

  calendar-cli create [--date <YYYYMMDD> --start <HHMM> --end <HHMM>
                       --credential <credential_path>] <summary>
                        Create an event onto the calendar.


Options:
  --version             show program's version number and exit
  -h, --help            show this help message and exit
  --calendar=CALENDAR   set calendar id to CALENDAR (default:primary)
  --date=YYYYMMDD       set date to YYYYMMDD in the setup/create command
                        (default:today)
  --credential=CREDENTIAL
                        set credential path to CREDENTIAL
                        (default:/Users/user/.credentials/calendar-cli.json)
  --read-only           create a read-only credential file in the setup
                        command (default: False)
  --start=HHMM          set start time in the create command
  --end=HHMM            set end time in the create command
  --debug               enable debug logging (default: False)

 

3. 認証用ファイルの作成

calendar-cli でセットアップ用のコマンドを用意した。
client_secret.json の部分は、手順1でダウンロードした適切なパスに書き換えること。

$ calendar-cli setup client_secret.json

一度ブラウザ上で OAuth 認証が行われた後、~/.credentials/calendar-cli.json として認証情報が保存される。

 

使い方

 

1. サマリーの表示
  • 今日の予定を表示 (デフォルトのカレンダー)
    $ calendar-cli
  • 指定した日の予定を表示 (デフォルトのカレンダー)
    $ calendar-cli --date 12/6

    日付として指定可能なフォーマットは以下。年を省略すると現在の年で補完される。
    • 20151206 (YYYYmmdd)
    • 2015-12-6, 2015-12-06 (YYYY-m-d)
    • 2015/12/6, 2015/12/06 (YYYY/m/d)
    • 12-6-2015, 12-06-2015 (m-d-YYYY)
    • 12/6/2015, 12/06/2015 (m/d/YYYY)
    • 12-6, 12-06 (m-d)
    • 12/6, 12/06 (m/d)
  • 指定したカレンダーの今日の予定を表示
    $ calendar-cli --calendar xxxxxx@group.calendar.google.com
    [終日] 有給休暇 (James LaBrie)
    [11:00-12:00] ○○様来社 (John Petrucci)
    [14:00-15:00] [外出] △△訪問 (John Myung)
    [17:30-22:00] □□勉強会 (Jordan Rudess)
    [17:30-23:00] ☆☆飲み会 (Mike Mangini)

 

2. イベントの登録
  • 当日または翌日の 10:30 に 15分のイベントを作成
    $ calendar-cli create --start 10:30 内容
    イベントを作成しました: 2015-11-02 月 [10:30-10:45] 内容
    現在時刻が 10:30 以前であれば当日、以降であれば翌日にイベントが作成される。
    時刻指定のフォーマットは以下。
    • 1030 (HHMM)
    • 9:30, 09:30 (H:M)
  • 当日または翌日に 12:00-13:00 のイベントを作成
    $ calendar-cli create --start 12:00 --end 13:00 内容
    イベントを作成しました: 2015-11-02 月 [12:00-13:00] 内容
    現在時刻が 12:00 以前であれば当日、以降であれば翌日にイベントが作成される。
  • 日付を指定してイベントを作成
    $ calendar-cli create --date 12/6 --start 12:00 --end 13:00 内容
    イベントを作成しました: 2015-12-06 日 [12:00-13:00] 内容
    --date オプションのフォーマットはサマリー表示と同じ。
  • 終日イベントを作成
    $ calendar-cli create --date 12/12 内容
    イベントを作成しました: 2015-12-12 土 [終日] 内容

 

Related Posts

11.01.2015

Type Assertion Decorator in Python

Python: 型チェックのデコレータ

 

Python は動的型付けの言語だが、抽象的なライブラリを書こうとすればどうしても型チェックの需要が生じる。

  • PyContracts は非常に良さそうなライブラリだが、独自のクラスには対応していない模様。
  • なので、車輪の再発明と知りつつも自前で実装してみた。

 

コード

 

インストール

mog-commons-python の一部として実験的にリリースしている。

pip install mog-commons

inspect#getcallargs をバックポートしたので、Python 2.6 / 2.7 / 3.2 / 3.3 / 3.4 / 3.5 の全てで動く。

 

使い方

関数定義と一緒に @types デコレータを指定するだけ。

  • 1個以下の可変長引数として、戻り値の型を指定 (省略も可能)
  • 名前付き引数として、パラメータ名=型 と制約を定義
  • ListOf, DictOf などを使えば、コレクションの要素の型もチェックできる
  • チェックに失敗したら、TypeError が発生
>>> from mog_commons.types import *

>>> @types(float, x=int, y=float, z=ListOf(int))
... def f(x, y, z):
...     return x * y + sum(z)
...

>>> f(1, 2, 3)
TypeError: y must be float, not int.

>>> f(1, 2.0, 3)
TypeError: z must be list(int), not int.

>>> f(1, 2.0, [3, 4])
9.0

戻り値だけのチェックも可能。

>>> @types(bool)
... def g(x):
...     return x
...

>>> g(3)
TypeError: must return bool, not int.

>>> g(True)
True

簡便のため、Python バージョンに透過的な String, Unicode, Option も定義した。
可変長引数、名前付き引数をチェックする場合は以下のようにする。

>>> @types(args=VarArg(String), kwargs=KwArg(int))
... def h(*args, **kwargs):
...     return 'ok'
...

>>> h('abc', 5, a=123, b=456)
TypeError: args must be tuple((basestring|str)), not tuple.

>>> h('abc', 'def', a=123, b=456)
'ok'

実際にどの要素の型が異なっているのか、までは表示していない。

 

これで、Typesafe Python にまた一歩近づいた。

10.24.2015

Launching Windows Server in Amazon Web Services

AWS で Windows サーバ (EC2) を起動する

 

インスタンスの作成

  • AWSマネジメントコンソール にサインイン
  • サービス -> EC2
  • 画面右上のメニューからリージョンを選択: 今回は節約のため「米国東部 (バージニア北部)」
  • EC2 ダッシュボード -> インスタンスの作成

 

ステップ 1: Amazon マシンイメージ (AMI)
  • クイックスタート -> Microsoft Windows Server 2012 R2 Base (64ビット) を選択
    • 日本語版を使用したい場合は、コミュニティ AMI -> 「2012-R2 Japanese Base」などで検索して見つかったものを選択
ステップ 2: インスタンスタイプの選択
  • t2.micro を選択し、「次の手順: インスタンスの詳細の設定」をクリック
ステップ 3: インスタンスの詳細の設定
  • デフォルトのまま、「次の手順: ストレージの追加」をクリック
ステップ 4: ストレージの追加
  • ルートディスクが 30GB 以上でないと作成に失敗する。
  • 「次の手順: インスタンスのタグ付け」をクリック
ステップ 5: インスタンスのタグ付け
  • キー: Name
  • 値: win2012-1 (適宜設定)
  • 「次の手順: セキュリティグループの設定」をクリック
ステップ 6: セキュリティグループの設定
  • セキュリティグループの割り当て: 新しいセキュリティグループを作成する
  • セキュリティグループ名: security-win
  • 説明: Security group for Windows servers
  • タイプ: RDP (プロトコル: TCP, ポート範囲: 3389)
  • 送信元: マイIP
  • 「確認と作成」をクリック
ステップ 7: インスタンス作成の確認
  • 「作成」をクリック
  • 既存のキーペアを選択するか、新しいキーペアを作成します。
    • 新しいキーペアの作成
    • キーペア名: aws-win (適宜設定)
    • 「キーペアのダウンロード」をクリック
    • aws-win.pem をダウンロードし保存
      $ mv -i ~/Downloads/aws-win.pem ~/.ssh/
      $ chmod 600 ~/.ssh/aws-win.pem
  • 「インスタンスの作成」をクリック
  • 作成処理に成功したら、「インスタンスの表示」をクリック

 

パスワードの入手

  • 起動してから数分後、インスタンス一覧の画面で Windows サーバを右クリックし
    「Windows パスワードの取得」を選択

  • キーペアのパス: aws-win.pem の保存先を選択
  • 「パスワードの暗号化」をクリック
  • 画面に以下の情報が表示される
    • パブリック IP
    • ユーザー名
    • パスワード

 

リモートデスクトップ接続

  • Mac から Windows に接続する場合、まず App Store から Microsoft Remote Desktop をインストールする
  • アプリ起動後、New で新規接続情報を入力
    • Connection name: win2012-1
    • PC name: (先程入手したパブリックIP)
    • User name: Administrator
    • Password: (先程入手したパスワード)
    • Start session in full screen: チェックを外す (お好みで)
    • Use all monitors: チェックを外す
      (デュアルディスプレイの場合に、Windows側 も 2画面になってしまうのを防ぐ)
  • 接続時、Verify Certificate のウィンドウが出たら、
    Show Certificate -> Always trust ... にチェックを付けてから Continue をクリック
  • フルスクリーンの切り替えは Command+1 でできる

Fullscreen 10 24 15 7 09 PM

 

Windows 上で Python をセットアップしてみる

  • Internet Explorer を起動し、Python 公式サイトからインストーラーをダウンロードし実行。
  • PowerShell 上でパスを通す
    setx PATH $Env:PATH";C:\Python27;C:\Python27\Scripts"
  • その後、PowerShell の落とし上げをすれば、python や pip が使えるようになる。

 

数百円で、すぐに使える Windows 環境が手に入るのは本当に素晴らしい。

10.21.2015

Running Windows-OS CI for Python Projects on AppVeyor

AppVeyor を使って Windows 環境で Python プロジェクトの CI を回す

 

easy-menu を AppVeyor の CI に載せたので簡単にメモ。

AppVeyor とは

Windows OS (デフォルトは Windows Server 2012) 環境で CI を実行してくれるサービス。

OSS は無料で制限なく使えるので、Windows 版 Travis CI のようなものと考えてよい。

セットアップ

プロジェクト追加

GitHub のアカウントで認証し、画面からプロジェクトを追加するだけ。

appveyor.yml

以下のリポジトリから、appveyor.yml と appveyor ディレクトリ配下のスクリプトを拝借した。

基本的にそのままで動くが、バリエーションが多すぎるので適宜減らすとよい。
(最初、appv「a」yor.yml と typo していたため全く機能せず、焦った)

ハマりどころ

mock インストール失敗

環境の setuptools が古いせいか、mock ライブラリのインストールで失敗した。

ワークアラウンドとして、'mock == 1.0.1' とバージョンを固定したら通るようになった。

そもそもテストが通らない

OS に対して透過的なテストを書くことは、本当に難しい。

9.19.2015

Python: How to Implement Thread-Safe Auto-Increment in Redis

Python: Redis上でスレッドセーフなオート・インクリメントを実現する

 

目的

Redisで以下のような3種類のDBを使い、色々な名前の登録処理をしたい。

  • 名前 -> ID の検索テーブル
  • ID -> 名前 の検索テーブル
  • 特定の key で ID の登録数 (=払い出したIDの最大値) を保持するテーブル

ID は 1 を起点とするオート・インクリメントなもので、名前ごとにユニークな数値を割り当てる。

 

インターフェース

1個の Redis インスタンスの中にある 3つの DBを使用する想定。それぞれのDB番号とカウンターとして使用するキーの名前を指定して初期化。

register メソッドは文字列として name を受け取り、その name に対応するユニークな ID を返す。name が DB に登録されていない場合のみ、登録処理を行う。

import redis


class Registerer(object):
    def __init__(self, counter_key, db_counter, db, db_invert=None,
                 host='localhost', port=6379):
        self.counter_key = counter_key
        self.db_counter = db_counter
        self.db = db
        self.db_invert = db_invert
        self.host = host
        self.port = port
        self.redis = redis.Redis(host, port, db)
        self.redis_cnt = redis.Redis(host, port, db_counter)
        self.redis_inv = (redis.Redis(host, port, db_invert)
                          if db_invert is not None else None)

    def register(self, name):
        return ???

 

テスト

マルチスレッド・プログラミングをする時はテストがないと不安なので、先にテストを書く。
(実行すると、ローカルの Redis のデータは全て消える)

#!/usr/bin/env python
import unittest
import threading

from repos.registerer import Registerer


class TestRegisterer(unittest.TestCase):
    def _clear(self):
        self.r.redis.flushall()

    def setUp(self):
        self.num_records = 10000
        self.r = Registerer('count', 15, 14, 13)
        self._clear()

    def tearDown(self):
        self._clear()

    def test_register_serial(self):
        r = self.r
        for i in range(self.num_records):
            r.register('user-%04d' % i)

        for i in range(10):
            r.register('user-%04d' % i)

        self.assertEqual(r.redis.dbsize(), self.num_records)
        self.assertEqual(r.redis_inv.dbsize(), self.num_records)
        self.assertEqual(int(r.redis_cnt.get('count')), self.num_records)

        for i in range(100):
            self.assertEqual(int(r.redis.get(r.redis_inv.get(i * 50 + 1))), i * 50 + 1)
            self.assertEqual(r.redis_inv.get(i * 50 + 1), 'user-%04d' % (i * 50))

    def test_register_parallel(self):
        r = self.r
        threads = [threading.Thread(target=r.register, args=('user-%04d' % i,)) for i in range(10000)]

        for t in threads:
            t.start()

        for t in threads:
            t.join()

        self.assertEqual(r.redis.dbsize(), self.num_records)
        self.assertEqual(r.redis_inv.dbsize(), self.num_records)
        self.assertEqual(int(r.redis_cnt.get('count')), self.num_records)

        for i in range(100):
            self.assertEqual(int(r.redis.get(r.redis_inv.get(i * 50 + 1))), i * 50 + 1)

            # this will fail
            # self.assertEqual(r.invert_redis.get(i * 50 + 1), 'user-%04d' % (i * 50))

    def test_register_parallel_same_id(self):
        r = self.r
        threads = [threading.Thread(target=r.register, args=('user-0000',)) for _ in range(self.num_records)]

        for t in threads:
            t.start()

        for t in threads:
            t.join()

        self.assertEqual(r.redis.dbsize(), 1)
        self.assertEqual(r.redis_inv.dbsize(), 1)
        self.assertEqual(int(r.redis_cnt.get('count')), 1)
        self.assertEqual(int(r.redis.get('user-0000')), 1)
        self.assertEqual(r.redis_inv.get(1), 'user-0000')


if __name__ == '__main__':
    unittest.main()

 

ナイーブな実装 (問題あり)

名前 -> ID のテーブルを検索し、値が取得できなかったら Redis の INCR コマンドで値を更新し、その ID を正引き/逆引きのテーブルそれぞれ登録する。

念の為に SETNX コマンドを利用し、キーが既に存在していたら例外を送出するようにする。

    def register_naive(self, name):
        index = self.redis.get(name)
        if index is None:
            index = self.redis_cnt.incr(self.counter_key)
            if not self.redis.setnx(name, index):
                raise Exception(
                    'Failed to register: db=%d, key=%s, value=%s' %
                    (self.db, name, index))
            if self.db_invert is not None:
                if not self.redis_inv.setnx(index, name):
                    raise Exception(
                        'Failed to register: db=%d, key=%s, value=%s' %
                        (self.db_invert, index, name))
        return int(index)

Redis の INCR はアトミックな処理であるし、一見問題はなさそうだが、先程書いたテストが失敗する。

.Exception in thread Thread-10001:
Traceback (most recent call last):
  File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/xxxxxx/registerer.py", line 36, in register_naive
    raise Exception('Failed to register: db=%d, key=%s, value=%s' % (self.db, name, index))
Exception: Failed to register: db=14, key=user-0000, value=1

F.
======================================================================
FAIL: test_register_parallel_same_id (__main__.TestRegisterer)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "./tests/test_registerer.py", line 72, in test_register_parallel_same_id
    self.assertEqual(int(r.redis_cnt.get('count')), 1)
AssertionError: 2 != 1

----------------------------------------------------------------------
Ran 3 tests in 9.492s

FAILED (failures=1)

これは、同じ名前を同時に大量に登録した場合にのみ発生する。
名前 -> IDテーブルのチェックと更新の間のタイミングで競合状態が発生し、同じ名前の登録処理が複数実行されているためである。

 

トランザクションを利用した実装

redis-py には transaction というお誂え向きのヘルパーメソッドが用意されている。

パイプラインを引数に取る関数と、楽観的ロックをかけるキーを指定するだけでシンプルにトランザクションを記述できる。

最終的なコードは以下のようになった。

import redis


class Registerer(object):
    def __init__(self, counter_key, db_counter, db, db_invert=None,
                 host='localhost', port=6379):
        self.counter_key = counter_key
        self.db_counter = db_counter
        self.db = db
        self.db_invert = db_invert
        self.host = host
        self.port = port
        self.redis = redis.Redis(host, port, db)
        self.redis_cnt = redis.Redis(host, port, db_counter)
        self.redis_inv = (redis.Redis(host, port, db_invert)
                          if db_invert is not None else None)

    def register(self, name):
        def f(pipe):
            index = self.redis.get(name)
            if index is None:
                index = pipe.incr(self.counter_key)
                if not self.redis.setnx(name, index):
                    raise Exception(
                        'Failed to register: db=%d, key=%s, value=%s' %
                        (self.db, name, index))
                if self.db_invert is not None:
                    if not self.redis_inv.setnx(index, name):
                        raise Exception(
                            'Failed to register: db=%d, key=%s, value=%s' %
                            (self.db_invert, index, name))
            return int(index)

        return self.redis_cnt.transaction(
            f, self.counter_key, value_from_callable=True)

 

無事、テストも通った。

...
----------------------------------------------------------------------
Ran 3 tests in 13.818s

OK

 

References

9.18.2015

Image Conversion Cheat Sheet

画像変換チートシート

 

コマンドラインで画像変換を完結するためのメモ。

9.02.2015

Getting Started with Apache Spark Cluster and GraphX

Spark クラスタを 10分 で構築して GraphX を試してみる

 

ローカルモードで動かした後の次のステップ

前提条件

  • OS: Mac or Linux
  • JDK: 7+

 

インストール

  • Downloads | Apache Spark
    • 既に構築済みの Hadoop クラスタがあれば、それに応じたパッケージタイプを選択。
    • 「Pre-built for Hadoop 2.6 and later」あたりを選べば無難。
    • Scala 2.11 を使いたい場合はソースからのビルドが必要。
  • パッケージを選択した場合、ダウンロードしたファイルを展開すればすぐに使える。
  • Spark 一式は /usr/local/share/spark として使う。(以降、適当に読み替えること)
  • log4j.properties ファイルを作成して、不要な INFO ログを抑止する。

実行例

# ### ダウンロードと展開、移動
# curl -O http://d3kbcqa49mib13.cloudfront.net/spark-1.4.1-bin-hadoop2.6.tgz
# tar zxf ./spark-1.4.1-bin-hadoop2.6.tgz
# mv -i ./spark-1.4.1-bin-hadoop2.6 /usr/local/share/spark-1.4.1
#
# ### シンボリックリンクを作成
# ln -s /usr/local/share/spark-1.4.1 /usr/local/share/spark
#
# ### ロギング設定
# cd /usr/local/share/spark
# cp -pi ./conf/log4j.properties.template ./conf/log4j.properties
# vi ./conf/log4j.properties
# diff ./conf/log4j.properties.template ./conf/log4j.properties
2c2
< log4j.rootCategory=INFO, console
---
> log4j.rootCategory=WARN, console

 

起動

Hadoop, HDFS が使えなくても Spark の利用は可能。

クラスタの管理サービスにはいくつか選択肢がある。

  • Amazon EC2
  • Standalone Deploy Mode
  • Apache Mesos
  • Hadoop YARN

今回は、一番簡単な Standalone Deploy Mode を試す。

  • マスター起動
    # /usr/local/share/spark/sbin/start-master.sh
  • スレーブ起動
    # /usr/local/share/spark/sbin/start-slave.sh "spark://マスターのIPアドレス:7077"
    マスターのIPアドレスの箇所はホスト名でもよいが、localhost, 127.0.0.1 はデフォルトで接続が拒否されている模様。
    マスターとの同居も可。

 

動作確認

  • ログ
    • /usr/local/share/spark/logs 配下に master, worker それぞれのログファイルが作成される。
  • GUI
    • http://マスターのIPアドレス:8080
      Spark Master at spark xxxxxxxx 7077

 

build.sbt

必要最低限のものだけ。

name := "your-app-name"

version := "0.1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.4.1",
  "org.apache.spark" %% "spark-graphx" % "1.4.1"
)

 

データ

@teppei_tosa さんのチュートリアルに従う。

1 2
2 3
3 1

 

ソースコード

import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

object Main {
  def main(args: Array[String]) = {
    val conf = new SparkConf().setMaster("spark://MASTER_IP:7077").setAppName("My Application")
    val sc = new SparkContext(conf)

    val g = GraphLoader.edgeListFile(sc, "path/to/edge_list.txt").cache()

    g.vertices.collect().foreach(println(_))
    g.edges.collect().foreach(println(_))
    sc.stop()
  }
}

実行

$ sbt run
(snip)

(3,1)
(1,1)
(2,1)

(snip)

Edge(1,2,1)
Edge(2,3,1)
Edge(3,1,1)

INFO ログが大量に出て見にくいが、無事グラフの情報が画面に出力された。

Python3.2: TypeError in subprocess Module When Using Bytes Command String

Python3.2: subprocess モジュールでコマンドラインを bytes で書くと TypeError が発生

 

事象

POSIX互換環境の Python 3.2 で subprocess#call を呼び出すとき
コマンドを bytes で書きつつ shell モードを有効にすると、以下の意味不明なエラーが出る。

$ python
Python 3.2.5 (default, Sep  1 2015, 23:07:07)
[GCC 4.2.1 Compatible Apple LLVM 6.1.0 (clang-602.0.53)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import subprocess
>>> subprocess.call(b'echo', shell=True)
(snip)
TypeError: Can't convert 'int' object to str implicitly
>>>

コマンドを str で渡せばよいのだが、それだとコマンドライン自体が SJIS などの非UTF-8で表現されている場合に困ってしまう。

 

原因

子プロセスを生成するところで、args に ['/bin/sh', '-c', 101, 99, 104, 111] というパラメータが渡っていた。
['/bin/sh', '-c'] + list(b'echo') のようなコードで作られたようだ。

1バイトごとに分解され int となったものが暗黙的に str に変換されずに TypeError が送出される。

Python 3.3 では修正済み。

 

対応方法

Python 3.2 をサポートしないのが賢明と思われるが、以下のようなワークアラウンドでも回避できそうだ。

import subprocess


NOT_USE_SHELL = sys.version_info[:2] == (3, 2) and not sys.platform == 'win32'

cmd = b'echo'

if NOT_USE_SHELL:
    ret_code = subprocess.call(['/bin/sh', '-c', cmd], shell=False)
else:
    ret_code = subprocess.call(cmd, shell=True)

 

話題になったのが 5年以上前ということもあり、Google検索しても情報が出てこないのがつらい。

8.31.2015

SSH Cheat Sheet

SSH チートシート

 

たまに使うけど忘れがちなコマンド & tips についてメモ。

Easy Menu 1.0 is Now Released!

新しく生まれ変わった Easy Menu をお試しください

 

Easy Menu とは

Easy Menu とは、設定ファイルを元にターミナル上で動作するコマンドランチャーを立ち上げるだけの、とてもシンプルなツールです。

実に 2 年以上の間ずっと放置していたのですが意外と社内で好評でしたので、この際リニューアルしてみました。

  • 動作イメージ

 

すぐに試す

今回、パッケージを PyPI に登録することができたので、インストールがとても簡単になりました。

設定ファイルを HTTP(S) 経由で取得すれば、たった 2行のコマンドで Easy Menu をお試しいただけます。

注意: 2015-08-31時点では、まだ Python 3.x系には対応していません
=> 2015-09-02 にリリースした v1.0.2 で Python 3.2/3.3/3.4 に対応しました

pip install easy-menu
easy-menu http://git.io/vGWla

 

主な起動方法
  • easy-menu
    • パラメータを付けずにコマンドを実行した場合、カレントディレクトリから上位に向かって easy-menu.yml が存在するかチェックしていきます。そして最初に見つかった設定ファイルの内容がロードされます。 (Vagrant における Vagrantfile の検索と同じような動きです)
    • 環境変数 EASY_MENU_CONFIG を設定することで、検索対象のファイル名
      (デフォルトは easy-menu.yml) を変更することもできます。
    • プロジェクトのルートディレクトリに easy-menu.yml を置いておくと捗ります。
  • easy-menu <設定ファイルのパス>
    • もちろん、設定ファイルの場所を直接指定することもできます。
      尚、コマンド実行時には、設定ファイルの存在するディレクトリにカレントディレクトリを移動してからコマンドが実行されます。
  • easy-menu <設定ファイルのURL>
    • http(s) により設定ファイルをネットワーク上から取得します。
    • スキームは http, https のみ対応。省略はできません。

 

新機能

 

1行のコマンドでインストール

前述のとおり、pip コマンドでインストールできるようになりました。

YAML形式のサポート

これまでは JSON のみのサポートでしたが、YAML にも対応しました。

別ファイルのインクルード

"include" というキーワードを使うことで、別のファイルのメニューをマージすることが可能になりました。

設定イメージ:

---
メインメニュー:
  - include: common.yml
  - サブメニュー:
    - コマンド3: echo 3
    - コマンド4: echo 4
---
共通メニュー:
  - コマンド1: echo 1
  - コマンド2: echo 2

この場合、以下のようにメニューが生成されます。

---
メインメニュー:
  - 共通メニュー:
    - コマンド1: echo 1
    - コマンド2: echo 2
  - サブメニュー:
    - コマンド3: echo 3
    - コマンド4: echo 4

 

動的なメニュー生成

"eval" というキーワードを使うことで、任意のコマンドをメニュー生成時に実行し、その標準出力を YAML 形式の設定として読み込むことが可能になりました。

設定イメージ:

---
メインメニュー:
  - eval: scripts/create_ec2_menu.sh

この場合、シェルスクリプト create_ec2_menu.sh の出力がそのままメニューの内容として反映されます。

例えば、AWS で起動中の EC2 インスタンスの一覧を動的に取得するような使い方を想定しています。

国際化対応

現状、対応しているのは英語と日本語のみです。

 

以上、簡単な機能の紹介でした。
不具合、要望などがありましたら、お気軽に GitHub Issue に登録をお願いします。

8.23.2015

Running Apache Spark Cluster by using Kubernetes

Kubernetes を使って Spark クラスタを立ち上げる

 

環境

 

インストール

 

Kubernetes のインストール (NG)

適当なプロジェクト用ディレクトリに移動し、以下のコマンドを実行。

$ export KUBERNETES_PROVIDER=vagrant
$ curl -sS https://get.k8s.io | bash

早速コケた。

Downloading kubernetes release v1.0.3 to /proj/mogproject/example-spark/kubernetes.tar.gz
--2015-08-23 10:39:46--  https://storage.googleapis.com/kubernetes-release/release/v1.0.3/kubernetes.tar.gz
Resolving storage.googleapis.com... 216.58.220.176
Connecting to storage.googleapis.com|216.58.220.176|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 121767360 (116M) [application/x-tar]
Saving to: ‘kubernetes.tar.gz’

100%[===============================================================================>] 121,767,360 1.76MB/s   in 65s

2015-08-23 10:40:51 (1.80 MB/s) - ‘kubernetes.tar.gz’ saved [121767360/121767360]

Unpacking kubernetes release v1.0.3
Creating a kubernetes on vagrant...
Starting cluster using provider: vagrant
... calling verify-prereqs
... calling kube-up
Using credentials: vagrant:vagrant
Bringing machine 'master' up with 'virtualbox' provider...
Bringing machine 'minion-1' up with 'virtualbox' provider...
==> master: Box 'kube-fedora21' could not be found. Attempting to find and install...
    master: Box Provider: virtualbox
    master: Box Version: >= 0
Request for box's Amazon S3 region was denied.

This usually indicates that your user account with access key ID



is misconfigured. Ensure your IAM policy allows the "s3:GetBucketLocation"
action for your bucket:

    arn:aws:s3:::opscode-vm-bento

どうやら vagrant up コマンドに失敗している模様。
認証周りを管理しているプラグイン vagrant-s3auth をアンインストールしたら、先に進んだ。

$ vagrant plugin uninstall vagrant-s3auth

しかし、今度はこの画面から一向に進まない。

Waiting for each minion to be registered with cloud provider
..................................................................

1時間待っても、2時間待っても状況変わらず。

どうやら、こちらの certificate 関連の厄介な問題に直面したようだ。

解決策は、1個前のバージョンである v1.0.1 にこちらのパッチを適用し、手動でインストールせよ、とのこと。

 

Kubernetes のインストール (リトライ)

まずは VM を初期化。

$ cd kubernetes
$ vagrant destroy -f
$ vagrant box remove kube-fedora21
$ cd ..
$ rm -rf ./kubernetes

Version 1.0.1 を手動でダウンロードしてから、パッチを適用。その後、インストール。

$ curl -LO https://github.com/kubernetes/kubernetes/releases/download/v1.0.1/kubernetes.tar.gz
$ tar zxvf ./kubernetes.tar.gz
$ cd kubernetes/cluster/vagrant
$ curl -O https://raw.githubusercontent.com/kubernetes/kubernetes/release-1.0/cluster/vagrant/provision-master.sh
$ curl -O https://raw.githubusercontent.com/kubernetes/kubernetes/release-1.0/cluster/vagrant/provision-minion.sh
$ cd ../..
$ export KUBERNETES_PROVIDER=vagrant
$ ./cluster/kube-up.sh

10分ほどで正常終了。ここまで長かった。

Cluster validation succeeded
Done, listing cluster services:

Kubernetes master is running at https://10.245.1.2
KubeDNS is running at https://10.245.1.2/api/v1/proxy/namespaces/kube-system/services/kube-dns
KubeUI is running at https://10.245.1.2/api/v1/proxy/namespaces/kube-system/services/kube-ui

API との疎通確認

$ ./cluster/kubectl.sh get pods
NAME      READY     STATUS    RESTARTS   AGE

 

Spark クラスタの起動

Spark マスターサービスの起動

$ ./cluster/kubectl.sh create -f ./examples/spark/spark-master.json
$ ./cluster/kubectl.sh create -f ./examples/spark/spark-master-service.json

暫く経つと Running になる。

$ ./cluster/kubectl.sh get pods
NAME           READY     STATUS    RESTARTS   AGE
spark-master   0/1       Pending   0          1m
$ ./cluster/kubectl.sh get pods
NAME           READY     STATUS    RESTARTS   AGE
spark-master   1/1       Running   0          15m
$ ./cluster/kubectl.sh logs spark-master

Spark ワーカーの起動

$ ./cluster/kubectl.sh create -f examples/spark/spark-worker-controller.json

15分経っても、3個中2個は Pending のままだった。

$ ./cluster/kubectl.sh get pods
NAME                            READY     STATUS    RESTARTS   AGE
spark-master                    1/1       Running   0          34m
spark-worker-controller-6cgpd   0/1       Pending   0          15m
spark-worker-controller-tqa4b   0/1       Pending   0          15m
spark-worker-controller-yl7n2   1/1       Running   0          15m
$ ./cluster/kubectl.sh get services
NAME           LABELS                                    SELECTOR            IP(S)           PORT(S)
kubernetes     component=apiserver,provider=kubernetes                 10.247.0.1      443/TCP
spark-master   name=spark-master                         name=spark-master   10.247.70.164   7077/TCP
$ ./cluster/kubectl.sh get nodes
NAME         LABELS                              STATUS
10.245.1.3   kubernetes.io/hostname=10.245.1.3   Ready

 

Spark を使う

spark-master の IPアドレスとポートを確認。

$ ./cluster/kubectl.sh get service spark-master
NAME           LABELS              SELECTOR            IP(S)           PORT(S)
spark-master   name=spark-master   name=spark-master   10.247.70.164   7077/TCP

Kubernetes の minion-1 にログインし、適当な Docker コンテナを起動。
その中で、環境変数や /etc/hosts ファイルを書き換えるユーティリティ用スクリプト setup_client.sh を(同一シェル内で)実行。

[vagrant@kubernetes-minion-1 ~]$ sudo docker run -it gcr.io/google_containers/spark-base
root@49cc6b98cb5f:/# . /setup_client.sh 10.247.70.164 7077

Kubernetes 上の Spark クラスタへアクセスしていることを確認できた。

root@49cc6b98cb5f:/# spark-shell
15/08/23 11:04:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0
      /_/

Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_79)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
15/08/23 11:04:31 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
15/08/23 11:04:33 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
15/08/23 11:04:43 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 0.13.1aa
SQL context available as sqlContext.

scala> sc.isLocal
res0: Boolean = false

scala> sc.master
res1: String = spark://spark-master:7077

Python の API で、Spark ワーカーのホスト名を取得してみる。

root@49cc6b98cb5f:/# pyspark
Python 2.7.9 (default, Mar  1 2015, 12:57:24)
[GCC 4.9.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
15/08/23 11:12:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.4.0
      /_/

Using Python version 2.7.9 (default, Mar  1 2015 12:57:24)
SparkContext available as sc, HiveContext available as sqlContext.
>>> import socket
>>> sc.parallelize(range(1000)).map(lambda x:socket.gethostname()).distinct().collect()
['spark-worker-controller-yl7n2']

 

References

Python: Getting Started with NetworkX Graph Generator

NetworkX のグラフジェネレーターで遊ぶ

 

out-of-the-box な機能が非常に豊富で感動した。

せっかくなので、IPython Notebook でビジュアライズしてみる。

 

準備

全て必須という訳ではないが、一応インストールしておく。

pip install ipython jinja2 tornado pyzmq numpy scipy pylab matplotlib
pip install networkx

作業ディレクトリに移動して、IPython Notebook 起動。

ipython notebook

ブラウザ上で操作し、 New Notebook を作成。

Home

 

実行例

コードの補完もできる。

NetworkXGeneratorExample

ひととおりグラフを作ったあと保存し、そのファイルを Gist にアップロードしてみた。
すると以下のとおり、簡単にコードとイメージを共有することができる。

このカジュアルさに、また感動。Python 素晴らしい。

8.22.2015

Writing Set of Sets in Python

Python: set を要素に持つ set を書く

 

普通に書くと、unhashable type (ハッシュ化できないデータ型) だと怒られる。

>>> {set(), {1}, {1, 2}}
Traceback (most recent call last):
  File "", line 1, in
TypeError: unhashable type: 'set'

内側の set(集合) に frozenset を使えばよい。

>>> {frozenset(), frozenset({1}), frozenset({1, 2})}
{frozenset({1, 2}), frozenset(), frozenset({1})}

fronzenset はイミュータブルなので、後から変更を加えることはできない。

>>> s = frozenset()
>>> s.add(1)
Traceback (most recent call last):
  File "", line 1, in
AttributeError: 'frozenset' object has no attribute 'add'

 

 

References

8.10.2015

Shell: File Override in Safety

Shell: 安全にファイルの内容を入力と同時に書き換える

 

同僚に教えてもらった。

一見、本当に安全なのかわかりづらいが、結果、大丈夫そう。

(rm -f -- "${FILEPATH}" && COMMAND > "${FILEPATH}") < "${FILEPATH}"

これで、一時ファイルなしでファイルを入力しながら書き出すことが可能になる。(inode は変わる)

8.09.2015

CircleCI Failed After Restoring Cache Files

CircleCI がキャッシュ復元処理の後で失敗した

 

事象

CircleCI を回していたら、Restore cache の後でエラー終了となってしまった。

画面には以下のようなメッセージが出現している。

Looks like we had a bug in our infrastructure, or that of our providers (generally GitHub or AWS) We should have automatically retried this build. We've been alerted of the issue and are almost certainly looking into it, please contact us if you're interested in the cause of the problem.

 

原因

チャットでサポートに連絡したら、なんと 30分で回答が返ってきた。

We currently can’t cache files outside the home directory. E.g. they will be cached, but the build will err out on restoring the cache. I see that you are caching a few files in /usr/local, would that be an option for you to store them in the home directory, cache them and then symlink them into the right place?

実は今回、circle.yml で /usr/local 配下のあるディレクトリをキャッシュ対象として指定していた。
現状ではホームディレクトリ以外の場所はキャッシュできないようだ。

キャッシュ保存時は成功したように見えるが、復元の時点でエラーが発生するとのこと。

対応策としては、ホームディレクトリ配下にキャッシュ用のディレクトリを作って、そこにシンボリックリンクを張るとよいと教えてもらった。
後日対応し、問題なく稼動している。

dependencies:
  pre:
    - sudo mkdir -p yyyyy /usr/local/lib/xxxxx
    - sudo ln -s $PWD/yyyyy /usr/local/lib/xxxxx/yyyyy

 

StackShare でもカスタマーサポートが良いと好評だが、それを実感することができてよかった。

 

CircleCI については、こんなものも書きました。

7.27.2015

Python unittest Cheat Sheet

Python: unittest のチートシート

よく使うイディオムをまとめておく。

  • Python 2.6 に対応するには、unittest2 を使うのが一番簡単

7.20.2015

Dragon Quest 5 (PS2) Time Attack - 2:53

PS2版ドラゴンクエスト5のタイムアタック

 

遠い昔、プレイステーション2 版の ドラゴンクエスト5 で遊んでいた頃のノートがつい先日出てきた。

折角なので、Github にラップタイムをまとめておく。

7.12.2015

Safe and Smart Memoization in C++ (or Python)

エレガントなメモ化のやり方

 

同僚から教えてもらったテクニックを忘れそうになったのでメモ。

#include <cstdio>

int memo[100][100];

int main() {
  memset(memo, -1, sizeof memo);
  // call f with parameters
  f(???, ???)
}

int f(int x, int y) {
  int& res = memo[x][y];
  if (res < 0) {
    // do the work when the value has not been calculated
    res = ???
  }
  return res;
}

適当なn次配列をメモ化の領域として使い、1のビット(-1)で初期化するところまでは一般的なやり方。

ポイントは、実際にメモ化したデータを使う部分。
はじめに配列の要素の参照を取り、初期状態でない場合にのみその値を更新する。
いずれのケースでも最後にその参照を返す。

これで処理の流れがシンプルになるし、メモの更新し忘れを防ぐこともできる。

同様に、Python で値が None で初期化されているならこんな感じ。

def f():
    if cache is None:
        # do the work
        cache = ???
    return cache

6.28.2015

Ansible: Installing Graphite + Grafana 2

Ansible: Graphite + Grafana 2系をインストールする Playbook

 

以前作った Playbook が最新の Amazon Linux に対応できなくなっていたので更新。

主な対応内容

  • Amazon Linux のデフォルトの Python バージョンが、いつの間にか 2.7 に上がっていた。
    yum でインストールされる Python ライブラリは 2.6 向けにパッケージングされているので使えない。
  • インストールパスを標準準拠に。カスタマイズ箇所を最小限に留めるよう意識した。
  • MySQL の使用もやめた
  • Grafana 2系の導入
    • WebサービスやDBが内包されている。つまり Nginx, ElasticSearch も不要になった

セットアップ手順

README 参照。

色々ハマリどころも多かったが、いったん Playbook に反映。プロビジョニングしてすぐにグラフが作れるようになった。

 

s

S

 

ただし、Graphite はやはり依存関係が複雑で保守性が悪いので、今後は Grafana + InfluxDB の組み合わせを選択するのが主流になりそう。

 

References

Troubleshooting Guide for SSH Connection Error

SSH接続エラーのトラブルシューティング情報まとめ

 

SSH の接続に失敗する場合の対処方法の自分用メモ。随時更新していく。

6.22.2015

Counting True in the Python List

Python: リスト内の True の個数を数える

 

sum を使えば、True: 1, False: 0 としてカウントされるので便利。

>>> [False, True, True, False, True].count(True)
3
>>> sum([False, True, True, False, True])
3
>>> sum(x % 2 == 1 for x in range(5))
2

 

 

References

How to Internationalize Messages in Python

Python アプリケーション内のメッセージの国際化 (i18n) について

 

思ったよりも面倒だったのでメモ。

 

tl;dr

  • GNU gettext API よりも クラスベース API が推奨されている
  • ロケール辞書の置き場所が必要
  • 翻訳対象のメッセージにマーク _('text') を付けてアプリケーション・コードを書く
  • ソースコードからカタログ(.po)を作成する。
    これを実現するツールは lingua, babel, po-utils, xpot, xgettext など多数ある模様。
  • カタログを編集して翻訳後の文字列を記述し、コンパイルしてバイナリファイル(.mo)を作成。
    そして所定の位置に配備する。

 

具体例

 

ソースコード
import gettext

_ = gettext.translation('hello_i18n', 'locale', fallback=True).ugettext

print(_('hello i18n'))
print(_('hello %(world)s') % {'world': 'i18n'})

 

カタログの作成

今回は、xgettext + msgfmt で実現する。

### Macの場合
$ brew install gettext

### バージョンは環境に合わせて適宜変更
$ /usr/local/Cellar/gettext/0.19.4/bin/xgettext --language=python --from-code=UTF-8 \
--keyword=_ --add-comments=NOTE -o hello_i18n.po hello_i18n.py

$ vi hello_i18n.po
# SOME DESCRIPTIVE TITLE.
# Copyright (C) YEAR THE PACKAGE'S COPYRIGHT HOLDER
# This file is distributed under the same license as the PACKAGE package.
# FIRST AUTHOR , YEAR.
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2015-06-21 23:48+0900\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
"Language: \n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"

#: hello_i18n.py:5
msgid "hello i18n"
msgstr "こんにちは i18n"

#: hello_i18n.py:6
#, python-format
msgid "hello %(world)s"
msgstr "%(world)s、こんにちは"

 

カタログのコンパイル
$ mkdir -p locale/ja_JP/LC_MESSAGES
$ /usr/local/Cellar/gettext/0.19.4/bin/msgfmt -o locale/ja_JP/LC_MESSAGES/hello_i18n.mo \
./hello_i18n.po

 

実行結果
$ python2 ./hello_i18n.py
こんにちは i18n
i18n、こんにちは
$ LC_ALL=C python2 ./hello_i18n.py
hello i18n
hello i18n

 

GNU gettext との結び付きが強すぎるし、コンパイルが必須なのもあって、あまり使う気がしない設計という印象。

 

References

6.09.2015

HyperLogLog Implementation in Redis, pt.4

Redis: HyperLogLog の実装について その4

 

その他のトピック。

 

要素の追加

追加する要素のハッシュを求め、インデックスと base-2 rank を求める。
各インデックスごとにより大きい値をレジスタに保存するだけ。

 

データのマージ

$2^{14}$ 個の全てのレジスタを走査し、各位置に対して値の大きいほうを保持する。
非常に高速なマージ処理も HyperLogLog の特長である。

 

dense 表現について

dense 表現では、sparse 表現と異なり、全部のインデックスの値が配列的に保持されている。

Redis では以下のような構造のバイト配列を定義することで、メモリ使用量を圧縮している。
(各レジスタの値は 0〜50 の範囲内であるため、それぞれ 6 ビットで表現可能)

 * +--------+--------+--------+------//
 * |11000000|22221111|33333322|55444444
 * +--------+--------+--------+------//

sparse から dense へ変換が行われるタイミングは以下。

  • PFADD
    • 追加する要素の base-2 rank が 33 以上だった場合 (sparse では表現不可)
    • sparse 表現の使用サイズが サーバ変数 server.hll_sparse_max_bytes を超えた場合
      (デフォルトは 3000)

  • PFMERGE: マージ実行後は必ず dense 表現になる
  • PFDEBUG TODENSE key: デバッグ用のコマンド

 

 

Related Posts

6.07.2015

HyperLogLog Implementation in Redis, pt.3

Redis: HyperLogLog の実装について その3

 

前回は sparse 表現のバイナリについて中身をざっと見た。

今回は HyperLogLog の目的である、カーディナリティを求める処理 pfcount についてその実装を確かめてみる。

 

カウントのコア部分の処理は hllCount 関数に記述されている。

 

カウントの求め方

論文によれば、カーディナリティの期待値 $E$ は以下のように求められる。

  • レジスタの個数を $m$ とする。Redis の場合 $$m = 2^{14} = 16384$$
  • このとき $$E = \frac{\alpha_{m}m^2}{\sum_{j=1}^{m} 2^{-M^{(j)}}}$$
  • ここで $$\alpha_{m} = \left(m \int_{0}^{\infty} \left(\text{log}_2 \left(\frac{2+u}{1+u}\right)\right)^m du\right)^{-1}$$
    Redis の場合、これは $\alpha_{m} \approx 0.7213 \cdot \left({1 + \frac{1.079}{m}}\right)^{-1} \approx 0.7213$ で近似される。
  • また、$M^{(j)}$ は $j$番目のレジスタの値である。

 

具体的な例

これまで見てきたように、Redis の HyperLogLog 型に "A", "B", "C" という 3つの値を追加すると、レジスタの状態は以下のようになる。

  • index:4477 -> 3 (C)
  • index:12352 -> 1 (A)
  • index:12964 -> 3 (B)
  • 上記以外の index -> 0

この例で実際に $E$ を求めてみる。

  • $2^0 = 1, \qquad 2^{-1} = 0.5, \qquad 2^{-3} = 0.125$ であるから $$\sum_{j=1}^{m} 2^{-M^{(j)}} = 1 \cdot (m-3) + 0.5 \cdot 1 + 0.125 \cdot 2 = 16381.75$$
  • したがって $$E = \frac{\alpha_{m}m^2}{\sum_{j=1}^{m} 2^{-M^{(j)}}} \approx \frac{0.7213 \cdot 16384^2}{16381.75} \approx 11819.40$$

実際のカーディナリティは 3 なので、大きなズレがあるように見える。

期待値の補正

このように、HyperLogLog は小さいカーディナリティに対して大きな誤差が出るという性質がある。
この問題を解決するため、Redis では一旦期待値を算出した後で、以下のルールに従ってその値を補正している。

  • i) $E \lt 2.5 \cdot m = 40960$ かつ 値が 0 であるレジスタが存在する場合

    以下の式 (Redis のコメントには「LINEARCOUNTING アルゴリズム」と書かれている) によって期待値を計算し直す。値が 0 であるレジスタの個数を $z$ として $$E := m \cdot \text{log} \left( \frac{m}{z} \right)$$

    上記の具体例では、$z = 16384 - 3 = 16381$ なので $$E := m \cdot \text{log} \left( \frac{m}{z} \right) = 16384 \cdot \text{log} \left( \frac{16384}{16381} \right) \approx 3.0003$$
    正しいカーディナリティ 3 を得ることができた。

  • ii) i)に該当せず $E \lt 72000$ の場合
    以下の多項式バイアスを適用して期待値を調整している。これは $m = 2^{14}$ の場合に限り適用可能。 $$ \begin{align} bias &= 5.9119 \cdot 10^{-18}E^4 - 1.4253 \cdot 10^{-12}E^3 + 1.2940 \cdot 10^{-7}E^2-5.2921 \cdot 10^{-3}E + 83.3216 \\ E :&= E - E \cdot \frac{bias}{100} \end{align}$$
  • iii) その他の場合は補正なし

こうして得られた値 $E$ を 64ビット非負整数に変換したものが、HyperLogLog型のカウントとして得られる値である。

 

Related Posts

6.06.2015

easy-scala-bench - Run sbt-jmh in Easy Way

easy-scala-bench - 手軽に sbt-jmh を実行する

 

ワン・ライナーないしは数行の Scala スクリプトに対して簡単に JMH でマイクロ・ベンチマークを実行できるようにシェルを書いた。

 

 

前提条件

  • 0.13 系の sbt がインストールされていること
  • Bash (/bin/bash) が使えること

 

使い方

  • リポジトリのクローン
    $ cd your/work/dir
    $ git clone https://github.com/mogproject/easy-scala-bench.git
    $ cd easy-scala-bench
  • 測定したい Scala コードを easy-scala-bench スクリプトの入力に与える
    $ echo 'for (i <- 1 to 1000) for (j <- 1 to 1000) i + j' | ./easy-scala-bench

    これだけ。

    入力をもとに自動的に Scala コード (src/main/scala/Bench.scala) が生成され、
    コマンド sbt 'run -i 3 -wi 3 -f 1 -t 1 easy_scala_bench.Bench' が実行される。

  • 測定対象外の準備コードを書くこともできる
    $ echo '
    val xs = (1 to 1000000).toList
    ====
    xs.length
    ' | ./easy-scala-bench

    準備用コードと計測用コードを「====」(イコール 4個 完全一致) という行で区切れば、区切り以降のコードだけが計測対象になる。

  • シェルスクプトに対してシンボリック・リンクを張っても正しく動作する

    たとえば

    ln -s path/to/easy-scala-bench /usr/local/bin/

    のようにパスの通る場所にリンクを作ってコマンド化してしまえば、実行ファイルのパスを意識する必要がなくなる。

 

実行例

$ echo 'for (i <- 1 to 1000) for (j <- 1 to 1000) i + j' | ./easy-scala-bench
(snip)
[info] Compiling 1 Scala source to /private/tmp/easy-scala-bench/target/scala-2.11/classes...
[info] Generating JMH benchmark Java source files...
Processing 3 classes from /private/tmp/easy-scala-bench/target/scala-2.11/classes with "reflection" generator
Writing out Java source to /private/tmp/easy-scala-bench/target/scala-2.11/generated-sources/jmh and resources to /private/tmp/easy-scala-bench/target/scala-2.11/classes
[info] Compiling generated JMH benchmarks...
[info] Compiling 1 Scala source and 9 Java sources to /private/tmp/easy-scala-bench/target/scala-2.11/classes...
[info] Running org.openjdk.jmh.Main -i 3 -wi 3 -f 1 -t 1 easy_scala_bench.Bench
[info] # JMH 1.9.1 (released 43 days ago)
[info] # VM invoker: /Library/Java/JavaVirtualMachines/jdk1.7.0_75.jdk/Contents/Home/jre/bin/java
[info] # VM options: 
[info] # Warmup: 3 iterations, 1 s each
[info] # Measurement: 3 iterations, 1 s each
[info] # Timeout: 10 min per iteration
[info] # Threads: 1 thread, will synchronize iterations
[info] # Benchmark mode: Throughput, ops/time
[info] # Benchmark: easy_scala_bench.Bench.bench
[info]
[info] # Run progress: 0.00% complete, ETA 00:00:06
[info] # Fork: 1 of 1
[info] # Warmup Iteration   1: 120.743 ops/s
[info] # Warmup Iteration   2: 214.615 ops/s
[info] # Warmup Iteration   3: 222.462 ops/s
[info] Iteration   1: 226.371 ops/s
[info] Iteration   2: 230.060 ops/s
[info] Iteration   3: 226.733 ops/s
[info]
[info]
[info] Result "bench":
[info]   227.722 ±(99.9%) 37.095 ops/s [Average]
[info]   (min, avg, max) = (226.371, 227.722, 230.060), stdev = 2.033
[info]   CI (99.9%): [190.626, 264.817] (assumes normal distribution)
[info]
[info]
[info] # Run complete. Total time: 00:00:06
[info]
[info] Benchmark     Mode  Cnt    Score    Error  Units
[info] Bench.bench  thrpt    3  227.722 ± 37.095  ops/s
[success] Total time: 17 s, completed Jun 6, 2015 11:08:17 PM
$ echo '
val xs = (1 to 1000000).toList
xs.length
' | ./easy-scala-bench
(snip)
[info] # Run complete. Total time: 00:00:06
[info]
[info] Benchmark     Mode  Cnt   Score    Error  Units
[info] Bench.bench  thrpt    3  24.990 ± 71.849  ops/s
(snip)
$ echo '
val xs = (1 to 1000000).toList
====
xs.length
' | ./easy-scala-bench
(snip)
[info] # Run complete. Total time: 00:00:07
[info]
[info] Benchmark     Mode  Cnt    Score    Error  Units
[info] Bench.bench  thrpt    3  212.187 ± 35.787  ops/s
(snip)
$ echo '
val xs = (1 to 1000000).toVector
====
xs.length
' | ./easy-scala-bench
(snip)
[info] # Run complete. Total time: 00:00:06
[info]
[info] Benchmark     Mode  Cnt           Score           Error  Units
[info] Bench.bench  thrpt    3  1291414027.800 ± 568908317.371  ops/s
(snip)