※Rackサーバの方のUnicornではない.

はじめに

 UnicornQEMUベースの軽量,マルチプラットフォーム・マルチアーキテクチャ・JIT対応のCPUエミュレータ.周辺機器をエミュレーションしないため用途は限られるが,GoやPythonなど複数言語のバインディングを備えている.現在システムセキュリティ分野で最も注目されているOSSのひとつと言っても過言ではなく,AsiaCCS 2016で発表されたROPチェーン解析ツールROPMEMUや,遺伝的アルゴリズムによってROPチェーンを自動生成するツールroperのバックエンドとして用いられたり,Unicornと同じように注目を集めているバイナリ解析ツールangrとの連携が進められたりと,一大コミュニティを形成しつつある.
 コンセプトの説明はBlack Hat USA 2015の発表スライドに譲るとして,ここではその利用方法と内部実装にふれる.

Pythonバインディング

 情報セキュリティに興味があり,セキュリティ関係の仕事に携わりたいという人には,Pythonはうってつけの言語だ.その理由は,リバースエンジニアリングと攻撃コード作成のためのライブラリが充実しているためだ.
       — Justin Seitz『サイバーセキュリティプログラミング———Pythonで学ぶハッカーの思考』序文

 リバースエンジニアリングというタスクにUnicornを用いることを考える.他言語を選ぶ積極的な理由やPythonへのアレルギーがなければ,他のライブラリとの連携も考慮して,Pythonバインディングを活用すべきだろう.

エミュレーション

 最もシンプルな用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# unicorn/bindings/python/sample_x86.pyを抜粋・改変

from __future__ import print_function # Python 2.7を利用
from unicorn import *
from unicorn.x86_const import *

# エミュレーション対象の機械語
X86_CODE32 = b"\x41\x4a" # INC ecx; DEC edx

# 各基本ブロックに対するコールバック
def hook_block(uc, address, size, user_data):
print(">>> Tracing basic block at 0x%x, block size = 0x%x" %(address, size))

# 各命令に対するコールバック
def hook_code(uc, address, size, user_data):
print(">>> Tracing instruction at 0x%x, instruction size = %u" %(address, size))

# x86 32bitのコードをエミュレーション
def test_i386():
print("Emulate i386 code")
try:
# x86-32bitモードでエミュレータを初期化
mu = Uc(UC_ARCH_X86, UC_MODE_32)

# エミュレーション用に2MBのメモリを割り当て
mu.mem_map(ADDRESS, 2 * 1024 * 1024)

# 割り当てられたメモリに機械語を書き込み
mu.mem_write(ADDRESS, X86_CODE32)

# レジスタ初期化
mu.reg_write(UC_X86_REG_ECX, 0x1234) # エミュレーション中に加算される
mu.reg_write(UC_X86_REG_EDX, 0x7890) # エミュレーション中に減算される

# 各基本ブロックに対するコールバックを設定
mu.hook_add(UC_HOOK_BLOCK, hook_block)

# 各命令に対するコールバックを設定
mu.hook_add(UC_HOOK_CODE, hook_code)

# エミュレーション開始
mu.emu_start(ADDRESS, ADDRESS + len(X86_CODE32))

# レジスタの表示
print(">>> Emulation done. Below is the CPU context")

r_ecx = mu.reg_read(UC_X86_REG_ECX)
r_edx = mu.reg_read(UC_X86_REG_EDX)
print(">>> ECX = 0x%x" %r_ecx)
print(">>> EDX = 0x%x" %r_edx)

# メモリから命令列を読む
tmp = mu.mem_read(ADDRESS, 2)
print(">>> Read 2 bytes from [0x%x] =" %(ADDRESS), end="")
for i in tmp:
print(" 0x%x" %i, end="")
print("")

except UcError as e:
print("ERROR: %s" % e)

if __name__ == '__main__':
test_i386()

 これを実行すると次のような出力が得られる.

1
Emulate i386 code
>>> Tracing basic block at 0x1000000, block size = 0x2
>>> Tracing instruction at 0x1000000, instruction size = 1
>>> Tracing instruction at 0x1000001, instruction size = 1
>>> Emulation done. Below is the CPU context
>>> ECX = 0x1235
>>> EDX = 0x788f
>>> Read 2 bytes from [0x1000000] = 0x41 0x4a

 はい.
 なおエミュレーション時に各命令のトレースを得たければ,同じ開発陣による逆アセンブラCapstoneのPythonバインディングを併用して(詳細は割愛)次のようなフックを用意すればよい:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
...
from capstone import *
...
class SimpleEngine:
def __init__(self):
self.capmd = Cs(CS_ARCH_X86, CS_MODE_32) # アーキテクチャ指定
def disas_single(self, data):
for i in self.capmd.disasm(data, 16): # 逆アセンブル
print("\t%s\t%s" % (i.mnemonic, i.op_str))
break

disasm = SimpleEngine()
...
def hook_code(uc, address, size, user_data):
print(">>> Tracing instruction at 0x%x, instruction size = %u, " %(address, size), end="")
# メモリから実行される命令を読む
ins = uc.mem_read(address, size)
disasm.disas_single(str(ins))

 実行すると命令のトレースが得られる.

1
...
>>> Tracing instruction at 0x1000000, instruction size = 1,     inc     ecx
>>> Tracing instruction at 0x1000001, instruction size = 1,     dec     edx
...

 このように,Unicornではプラグイン側のエントリポイントからQEMUの機能を呼び出していくことになる.

Pythonバインディングの内部

 この内部では,ctypesによる共有ライブラリのロードが行われている.unicorn/bindings/python/unicorn/unicorn.pyを見よう:

1
2
3
4
5
6
7
8
9
10
11
12
13
for _lib in _all_libs:
try:
if _lib == "unicorn.dll":
for dll in _all_windows_dlls: # load all the rest DLLs first
_lib_file = os.path.join(_lib_path, dll)
if os.path.exists(_lib_file):
ctypes.cdll.LoadLibrary(_lib_file)
_lib_file = os.path.join(_lib_path, _lib)
_uc = ctypes.cdll.LoadLibrary(_lib_file)
_found = True
break
except OSError:
pass

 ここで_ucunicorn.dll(環境によって異なるがいずれにせよ共有ライブラリ)がロードされている.
 さて,これまで利用してきた関数のプロトタイプ宣言はunicorn/bindings/python/unicorn/unicorn.pyにある:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# setup all the function prototype
def _setup_prototype(lib, fname, restype, *argtypes):
getattr(lib, fname).restype = restype
getattr(lib, fname).argtypes = argtypes

ucerr = ctypes.c_int
uc_engine = ctypes.c_void_p
uc_hook_h = ctypes.c_size_t

_setup_prototype(_uc, "uc_version", ctypes.c_uint, ctypes.POINTER(ctypes.c_int), ctypes.POINTER(ctypes.c_int))
_setup_prototype(_uc, "uc_arch_supported", ctypes.c_bool, ctypes.c_int)
_setup_prototype(_uc, "uc_open", ucerr, ctypes.c_uint, ctypes.c_uint, ctypes.POINTER(uc_engine))
_setup_prototype(_uc, "uc_close", ucerr, uc_engine)
_setup_prototype(_uc, "uc_strerror", ctypes.c_char_p, ucerr)
_setup_prototype(_uc, "uc_errno", ucerr, uc_engine)
_setup_prototype(_uc, "uc_reg_read", ucerr, uc_engine, ctypes.c_int, ctypes.c_void_p)
_setup_prototype(_uc, "uc_reg_write", ucerr, uc_engine, ctypes.c_int, ctypes.c_void_p)
_setup_prototype(_uc, "uc_mem_read", ucerr, uc_engine, ctypes.c_uint64, ctypes.POINTER(ctypes.c_char), ctypes.c_size_t)
_setup_prototype(_uc, "uc_mem_write", ucerr, uc_engine, ctypes.c_uint64, ctypes.POINTER(ctypes.c_char), ctypes.c_size_t)
_setup_prototype(_uc, "uc_emu_start", ucerr, uc_engine, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_size_t)
_setup_prototype(_uc, "uc_emu_stop", ucerr, uc_engine)
_setup_prototype(_uc, "uc_hook_del", ucerr, uc_engine, uc_hook_h)
_setup_prototype(_uc, "uc_mem_map", ucerr, uc_engine, ctypes.c_uint64, ctypes.c_size_t, ctypes.c_uint32)
_setup_prototype(_uc, "uc_mem_map_ptr", ucerr, uc_engine, ctypes.c_uint64, ctypes.c_size_t, ctypes.c_uint32, ctypes.c_void_p)
_setup_prototype(_uc, "uc_mem_unmap", ucerr, uc_engine, ctypes.c_uint64, ctypes.c_size_t)
_setup_prototype(_uc, "uc_mem_protect", ucerr, uc_engine, ctypes.c_uint64, ctypes.c_size_t, ctypes.c_uint32)
_setup_prototype(_uc, "uc_query", ucerr, uc_engine, ctypes.c_uint32, ctypes.POINTER(ctypes.c_size_t))

# uc_hook_add is special due to variable number of arguments
_uc.uc_hook_add = _uc.uc_hook_add
_uc.uc_hook_add.restype = ucerr

UC_HOOK_CODE_CB = ctypes.CFUNCTYPE(None, uc_engine, ctypes.c_uint64, ctypes.c_size_t, ctypes.c_void_p)
UC_HOOK_MEM_INVALID_CB = ctypes.CFUNCTYPE(
ctypes.c_bool, uc_engine, ctypes.c_int,
ctypes.c_uint64, ctypes.c_int, ctypes.c_int64, ctypes.c_void_p
)
UC_HOOK_MEM_ACCESS_CB = ctypes.CFUNCTYPE(
None, uc_engine, ctypes.c_int,
ctypes.c_uint64, ctypes.c_int, ctypes.c_int64, ctypes.c_void_p
)
UC_HOOK_INTR_CB = ctypes.CFUNCTYPE(
None, uc_engine, ctypes.c_uint32, ctypes.c_void_p
)
UC_HOOK_INSN_IN_CB = ctypes.CFUNCTYPE(
ctypes.c_uint32, uc_engine, ctypes.c_uint32, ctypes.c_int, ctypes.c_void_p
)
UC_HOOK_INSN_OUT_CB = ctypes.CFUNCTYPE(
None, uc_engine, ctypes.c_uint32,
ctypes.c_int, ctypes.c_uint32, ctypes.c_void_p
)
UC_HOOK_INSN_SYSCALL_CB = ctypes.CFUNCTYPE(None, uc_engine, ctypes.c_void_p)

 メモリ・レジスタを読み書きできることがわかる.
 さきほどは基本ブロック・命令単位のコールバック関数———フック———を設置した.その他のフックも次のように書ける:

1
2
3
4
5
6
7
8
9
10
11
12
mu.hook_add(UC_HOOK_BLOCK, hook_block) # 基本ブロック単位
mu.hook_add(UC_HOOK_CODE, hook_code) # 命令単位
mu.hook_add(UC_HOOK_CODE, hook_code, None, ADDRESS, ADDRESS+20) # 指定範囲[ADDRESS, ADDRESS+20]の全命令
mu.hook_add(UC_HOOK_INSN, hook_in, None, 1, 0, UC_X86_INS_IN) # IN命令
mu.hook_add(UC_HOOK_INSN, hook_out, None, 1, 0, UC_X86_INS_OUT) # OUT命令
mu.hook_add(UC_HOOK_MEM_WRITE, hook_mem_access) # メモリ書き込み
mu.hook_add(UC_HOOK_MEM_READ, hook_mem_access) # メモリ読み込み
mu.hook_add(UC_HOOK_MEM_READ | UC_HOOK_MEM_WRITE, hook_mem_access) # その両方
mu.hook_add(UC_HOOK_MEM_READ_UNMAPPED | UC_HOOK_MEM_WRITE_UNMAPPED, hook_mem_invalid) # 無効なメモリアクセス
mu.hook_add(UC_HOOK_INSN, hook_syscall, None, 1, 0, UC_X86_INS_SYSCALL) # システムコール
mu.hook_add(UC_HOOK_INTR, hook_intr) # 割り込み
...

 こうしたフックや,これまで用いてきたレジスタやメモリの読み書き・エミュレーションといった機能はUcクラスのメソッドとして用意されている:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class Uc(object):
def __init__(self, arch, mode):
# verify version compatibility with the core before doing anything
(major, minor, _combined) = uc_version()
if major != uc.UC_API_MAJOR or minor != uc.UC_API_MINOR:
self._uch = None
# our binding version is different from the core's API version
raise UcError(uc.UC_ERR_VERSION)

self._arch, self._mode = arch, mode
self._uch = ctypes.c_void_p()
status = _uc.uc_open(arch, mode, ctypes.byref(self._uch))
if status != uc.UC_ERR_OK:
self._uch = None
raise UcError(status)
# internal mapping table to save callback & userdata
self._callbacks = {}
self._ctype_cbs = {}
self._callback_count = 0

...

# emulate from @begin, and stop when reaching address @until
def emu_start(self, begin, until, timeout=0, count=0):
status = _uc.uc_emu_start(self._uch, begin, until, timeout, count)
if status != uc.UC_ERR_OK:
raise UcError(status)

...

# return the value of a register
def reg_read(self, reg_id):
if self._arch == uc.UC_ARCH_X86:
if reg_id in [x86_const.UC_X86_REG_IDTR, x86_const.UC_X86_REG_GDTR, x86_const.UC_X86_REG_LDTR, x86_const.UC_X86_REG_TR]:
reg = uc_x86_mmr()
status = _uc.uc_reg_read(self._uch, reg_id, ctypes.byref(reg))
if status != uc.UC_ERR_OK:
raise UcError(status)
return reg.selector, reg.base, reg.limit, reg.flags
if reg_id in range(x86_const.UC_X86_REG_FP0, x86_const.UC_X86_REG_FP0+8):
reg = uc_x86_float80()
status = _uc.uc_reg_read(self._uch, reg_id, ctypes.byref(reg))
if status != uc.UC_ERR_OK:
raise UcError(status)
return reg.mantissa, reg.exponent

# read to 64bit number to be safe
reg = ctypes.c_int64(0)
status = _uc.uc_reg_read(self._uch, reg_id, ctypes.byref(reg))
if status != uc.UC_ERR_OK:
raise UcError(status)
return reg.value

...

# read data from memory
def mem_read(self, address, size):
data = ctypes.create_string_buffer(size)
status = _uc.uc_mem_read(self._uch, address, data, size)
if status != uc.UC_ERR_OK:
raise UcError(status)
return bytearray(data)

...

# add a hook
def hook_add(self, htype, callback, user_data=None, begin=1, end=0, arg1=0):
_h2 = uc_hook_h()

# save callback & user_data
self._callback_count += 1
self._callbacks[self._callback_count] = (callback, user_data)
cb = None

if htype == uc.UC_HOOK_INSN: # 特定の命令に応じた内部処理
insn = ctypes.c_int(arg1)
if arg1 == x86_const.UC_X86_INS_IN: # IN命令
cb = ctypes.cast(UC_HOOK_INSN_IN_CB(self._hook_insn_in_cb), UC_HOOK_INSN_IN_CB)
if arg1 == x86_const.UC_X86_INS_OUT: # OUT命令
cb = ctypes.cast(UC_HOOK_INSN_OUT_CB(self._hook_insn_out_cb), UC_HOOK_INSN_OUT_CB)
if arg1 in (x86_const.UC_X86_INS_SYSCALL, x86_const.UC_X86_INS_SYSENTER): # SYSCALL/SYSENTER命令
cb = ctypes.cast(UC_HOOK_INSN_SYSCALL_CB(self._hook_insn_syscall_cb), UC_HOOK_INSN_SYSCALL_CB)
status = _uc.uc_hook_add(
self._uch, ctypes.byref(_h2), htype, cb,
ctypes.cast(self._callback_count, ctypes.c_void_p),
ctypes.c_uint64(begin), ctypes.c_uint64(end), insn
)
elif htype == uc.UC_HOOK_INTR: # 割り込み
cb = ctypes.cast(UC_HOOK_INTR_CB(self._hook_intr_cb), UC_HOOK_INTR_CB)
status = _uc.uc_hook_add(
self._uch, ctypes.byref(_h2), htype, cb,
ctypes.cast(self._callback_count, ctypes.c_void_p),
ctypes.c_uint64(begin), ctypes.c_uint64(end)
)
else:
if htype in (uc.UC_HOOK_BLOCK, uc.UC_HOOK_CODE): # 基本ブロック・命令単位
# set callback with wrapper, so it can be called
# with this object as param
cb = ctypes.cast(UC_HOOK_CODE_CB(self._hookcode_cb), UC_HOOK_CODE_CB)
status = _uc.uc_hook_add(
self._uch, ctypes.byref(_h2), htype, cb,
ctypes.cast(self._callback_count, ctypes.c_void_p),
ctypes.c_uint64(begin), ctypes.c_uint64(end)
)
elif htype & (uc.UC_HOOK_MEM_READ_UNMAPPED | # 無効なメモリアクセス
uc.UC_HOOK_MEM_WRITE_UNMAPPED |
uc.UC_HOOK_MEM_FETCH_UNMAPPED |
uc.UC_HOOK_MEM_READ_PROT | # 保護された領域へのメモリアクセス
uc.UC_HOOK_MEM_WRITE_PROT |
uc.UC_HOOK_MEM_FETCH_PROT):
cb = ctypes.cast(UC_HOOK_MEM_INVALID_CB(self._hook_mem_invalid_cb), UC_HOOK_MEM_INVALID_CB)
status = _uc.uc_hook_add(
self._uch, ctypes.byref(_h2), htype, cb,
ctypes.cast(self._callback_count, ctypes.c_void_p),
ctypes.c_uint64(begin), ctypes.c_uint64(end)
)
else: # メモリ読み書き
cb = ctypes.cast(UC_HOOK_MEM_ACCESS_CB(self._hook_mem_access_cb), UC_HOOK_MEM_ACCESS_CB)
status = _uc.uc_hook_add(
self._uch, ctypes.byref(_h2), htype, cb,
ctypes.cast(self._callback_count, ctypes.c_void_p),
ctypes.c_uint64(begin), ctypes.c_uint64(end)
)

# save the ctype function so gc will leave it alone.
self._ctype_cbs[self._callback_count] = cb

if status != uc.UC_ERR_OK:
raise UcError(status)

return _h2.value

おわりに

 PythonからUnicornを利用する方法を紹介した.
 結局のところctypesでラップされており,こうした機能がどのようにして実行されるのかは本体のコードを読まなければ理解できない.そういうわけで,そのうちUnicorn本体の実装を読んでいくことにする.