Introduction

As you know, IDAPython is quite useful. And Triton concolic execution engine has python binding. Then… why not integrate them? I tried to stand on the shoulders of giants.

Backward Program Slicing

Roughly speaking, program slicing is a method to extract subset of program which is relevant to given statement. Here is an excerpt from M. Weiser. ICSE’81:

Starting from a subset of a program’s behavior, slicing reduces that program to a minimal form which still produces that behavior. The reduced program, called a “slice”, is an independent program guaranteed to faithfully represent the original program within the domain of the specified subset of behavior.

Kudos to Jonathan Salwan, we can easily apply backward program slicing to binary analysis process with minor modification of backward_slicing.py and proving_opaque_predicates.py. I wrote a simple, tiny glue between Triton and IDA Pro:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#!/usr/bin/env python
#coding: utf-8
from idc import *
from idaapi import *
from triton import *
ctx = TritonContext()
regs = {
'eax': REG.X86.EAX,
'ebx': REG.X86.EBX,
'ecx': REG.X86.ECX,
'edx': REG.X86.EDX,
}
def symbolization_init():
Triton.convertRegisterToSymbolicVariable(Triton.registers.eax)
Triton.convertRegisterToSymbolicVariable(Triton.registers.ebx)
Triton.convertRegisterToSymbolicVariable(Triton.registers.ecx)
Triton.convertRegisterToSymbolicVariable(Triton.registers.edx)
return
def opaque_predicate_detection(pc):
ctx.setArchitecture(ARCH.X86)
ctx.setAstRepresentationMode(AST_REPRESENTATION.PYTHON)
symbolization_init()
ast_ctx = ctx.getAstContext()
while True:
instruction = Instruction()
instruction.setAddress(pc)
opcode = GetManyBytes(pc, ItemSize(pc)) # Disassemble with IDA Pro
instruction.setOpcode(opcode)
ctx.processing(instruction) # Emulate instruction without Pintool
for se in instruction.getSymbolicExpressions():
se.setComment(str(instruction))
if str(instruction.getDisassembly()).split()[0] == 'cmp': # if instruction.isCompare():
reg = str(instruction.getOperands()[0]).split(':')[0] # Get first operand
if reg in regs:
Expr = ctx.getSymbolicRegisters()[regs[reg]]
slicing = ctx.sliceExpressions(Expr)
for k, v in sorted(slicing.items()):
print v.getComment()
print instruction
elif instruction.isBranch():
print instruction
if instruction.isConditionTaken():
print 'branch will taken'
else:
print 'branch will not taken'
break
pc = NextHead(pc)
print '-' * 50
def main():
ea = ScreenEA() # Get address corresponding to cursor
opaque_predicate_detection(ea)
if __name__ == '__main__':
view = main()

Showcase

The snippet extracts subset of program which is relevant to branch condition. We can run this from File -> Script file in IDA Pro menu.

Conditional Branch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
.text:004074DD loc_4074DD:
.text:004074DD 040 mov dword ptr [edi+18h], 0Ah
.text:004074E4 040 mov dword ptr [edi+14h], 0
.text:004074EB 040 mov word ptr [edi+8], 8235h
.text:004074F1 040 mov dword ptr [edi+0Ch], 3E8h
.text:004074F8 040 mov dword ptr [edi+10h], 0BB8h
.text:004074FF 040 sub esp, 4
.text:00407502 044 mov [esp+40h+Memory], 0B8h ; Size ; cursor
.text:00407509 044 call ??2@YAPAXI@Z ; operator new(uint)
.text:0040750E 044 add esp, 4
.text:00407511 040 mov [esi+18h], eax
.text:00407514 040 mov eax, dword_591EE4
.text:00407519 040 mov ecx, dword_591EE0
.text:0040751F 040 imul eax, eax
.text:00407522 040 imul edx, eax, 7
.text:00407525 040 dec edx
.text:00407526 040 mov eax, ecx
.text:00407528 040 imul eax, eax
.text:0040752B 040 cmp edx, eax
.text:0040752D 040 jz short loc_407538

becomes:

1
2
3
4
5
6
7
0x407514: mov eax, dword ptr [0x591ee4]
0x40751f: imul eax, eax
0x407522: imul edx, eax, 7
0x407525: dec edx
0x40752b: cmp edx, eax
0x40752d: je 0x407538
branch will not taken

Unconditional Branch

1
2
3
4
5
6
7
8
9
.text:004078C3 loc_4078C3:
.text:004078C3 000 mov esp, edi ; cursor
.text:004078C5 000 mov eax, dword_591EAC
.text:004078CA 000 lea ecx, [eax+4]
.text:004078CD 000 mov edx, eax
.text:004078CF 000 sar edx, cl
.text:004078D1 000 lea eax, [eax+edx*2]
.text:004078D4 000 mov dword_591EAC, eax
.text:004078D9 000 jmp short loc_4

becomes:

1
2
0x4078d9: jmp 0x40789a
branch will taken

Looks nice.

Last Words

Triton’s emulation iteration is compatible to IDAPython manner. Therefore, The combination of IDA Pro and Triton is pretty good.

Cheers,

Introduction

IDAPython is a powerful feature of IDA Pro, and there are many open-sourced IDAPython projects. However, we cannot use every GUI-based IDAPython script due to some Qt-related breaking changes between IDA Pro 6.8 and 6.9 or later. The main problem is about migrating no longer supported PySide code to PyQt5.

Recently I ported PySide code within idasec–one of the most sophisticated deobfuscation frameworks, which tackles opaque predicates and call stack tampering in terms of infeasibility questions, by utilizing Backward-Bounded Dynamic Symbolic Execution proposed in the remarkably well written paper S. Bardin et al. IEEE S&P’17–to PyQt5.

That’s why I decided to write this blog post for a note to self and for someone trying to do similar thing.

Related Work

There are 2 guidances to migrate PySide code to PyQt5:

Please read them before. I only give supplemental information in addition to predecessors.

How to Migrate

Now let’s get started.

Change QtGui methods to QtWidgets

Most methods in QtGui migrated to QtWidgets. Therefore,

1
from PySide import QtGui, QtCore

becomes:

1
from PyQt5 import QtCore, QtGui, QtWidgets

As an example, QTextEdit described in Hex Blog. In additions, the methods to be rewritten are as follows:

  • QtWidgets.QLayout
  • QtWidgets.QVBoxLayout
  • QtWidgets.QHBoxLayout
  • QtWidgets.QWidget
  • QtWidgets.QTableWidget
  • QtWidgets.QListWidget
  • QtWidgets.QTabWidget
  • QtWidgets.QDockWidget
  • QtWidgets.QTreeWidget
  • QtWidgets.QTreeWidgetItem
  • QtWidgets.QPushButton
  • QtWidgets.QRadioButton
  • QtWidgets.QToolButton
  • QtWidgets.QButtonGroup
  • QtWidgets.QGroupBox
  • QtWidgets.QSpinBox
  • QtWidgets.QCheckBox
  • QtWidgets.QComboBox
  • QtWidgets.QTextEdit
  • QtWidgets.QLineEdit
  • QtWidgets.QApplication
  • QtWidgets.QLabel
  • QtWidgets.QSizePolicy
  • QtWidgets.QMenu
  • QtWidgets.QFrame
  • QtWidgets.QProgressBar
  • QtWidgets.QStyle
  • QtWidgets.QSpacerItem
  • QtWidgets.QScrollArea
  • QtWidgets.QSplitter
  • There might be more…

My experience says that other than the following 3 methods may be rewritten:

  • QtGui.QPixmap
  • QtGui.QIcon
  • QtGui.QFont

idacute may overwrite all of QtGui methods, so I think there still needs to be manual works.

Overwrite _fromUtf8

We also need to overwrite _fromUtf8.

1
2
3
4
try:
_fromUtf8 = QtCore.QString.fromUtf8
except AttributeError:
    _fromUtf8 = lambda s: s

Others

These issues are described by predecessors:

  • Handling SIGNAL
  • Change FormToPySideWidget to FormToPyQtWidget
  • Change setResizeMode to setSectionResizeMode

Conclusion

This time, I was able to run idasec on IDA Pro 7.0 with some bug fixes and dirty patches – like this cool video:

If you are an IDA Pro 7.0 user, note that other backward-compatibility issue described in IDA: IDAPython backward-compatibility with 6.95 APIs will occur.

Enjoy!

HAI DOMO VIRTUAL YOUTUBER KIZUNA AI DESU. I’m still working on my English.

 Security meets Machine Learningという勉強会にて,上記のタイトルで発表した.資料はこちら:

 謎の力が働いて会社からの発表になっておりますが,機械学習の研究をしているわけではありません.既存研究の再現実装を試みているとこれ中国語の部屋じゃんという気持ちになる.
 ともあれ,これまで各種資料はただSpeakerDeckに載せるだけだったのを今後はブログから一元的に参照できるようにします.

 なにもかも忘れかけている.

はじめに

 z3pyはSMTソルバZ3のPythonバインディング.たとえば

1
2
3
4
5
...
if(x * 2 + 3 * y == 4
&& x * 3 + y == -1){
puts("congrats!");
}

のようなプログラムに対して

1
2
3
4
5
6
7
8
9
10
11
from z3 import *
x = Int('x')
y = Int('y')
s = Solver()
s.add(x * 2 + 3 * y == 4)
s.add(x * 3 + y == -1)
print(s.check())
print(s.model())

としてやれば期待される入力値[x = -1, y = 2]を算出できる.これだけではだからなに? という感じだが.シンボリック実行は各命令とメモリの状態からこの割当を自動生成するものだと思えばよい.これがangrやらTritonやらmiasmやらmanticoreやら,その他いまどきのバイナリ解析ツール群の基盤となっているというわけです.

参考になったサイト

 で,メモっとかないと忘れるので.SAT/SMTソルバのしくみやシンボリック実行全般まで広げると膨大になるということで,z3pyに限っています.前者については高速SATソルバーの原理 - 基盤(S)離散構造処理系プロジェクト[PDF]SAT/SMTソルバの仕組み - SlideShareを読むとよいでしょう.いまどきのSAT/SMTソルバにはVSIDSやらLubyリスタートやらいろいろな工夫が盛り込まれているが,とりあえずユーザとしてはDPLLとCDCLさえ抑えておけば問題ないはず.後者,シンボリック実行全般に関するおすすめの文献は秘密.Pythonバインディング以外の記事,angrやTritonなどサードパーティのツール群に関する記事も省いてある.

ウェブサイト 概要
CTF/Toolkit/z3py - 電気通信大学MMA 日本語で書かれた入門資料.これには書かれてないがBitVecSortも便利.
Theorem prover, symbolic execution and practical reverse-engineering z3pyのチュートリアル.まずはこの通りに手を動かす.
Breaking Kryptonite’s Obfuscation: A Static Analysis Approach Relying on Symbolic Execution シンボリック実行の最小構成の実装.
0vercl0k/z3-playground 上2つの資料で参照されているソースコード群.
Solving a simple crackme using Z3 – Aneesh Dogra’s Blog 簡単なcrackmeのwriteup.
Playing with Z3, hacking the serial check. – rosako’s blog 簡単なcrackmeのwriteup.
Using SAT and SMT to defeat simple hashing algorithms - LSE Blog オレオレハッシュ関数の解析.
Reversing the petya ransomware with constraint solvers ランサムウェアPetyaのSalsa実装の不備を突くdecryptor.
thomasjball/PyExZ3 シンボリック実行の実装例.z3公式からリンクが貼られている.symbolic/以下,z3_wrap.py, loader.pyが参考になる.
Quick introduction into SAT/SMT solvers and symbolic execution (DRAFT) [PDF] いろいろリンク貼ったけどこれだけ読めばいい.SMTソルバの紹介.数独の解き方.マインスイーパの自動化.デコンパイラ.難読化解除.シンボリック実行.ハッシュ関数の解析.全部載っている.Dennis Yurichev氏,Reverse Engineering for Beginnersも書いていて,慈善事業家か?

 はてなブックマークを使っていればよかったのではという気がしてきた.

おわりに

 z3py便利最高ですね.みなさんは便利最高ですか?
 これで次のようなCTFの問題を解くことができます:

問題 大会
Unbreakable Enterprise Product Activation Google CTF 2016
Ropsynth SECCON 2016 Online CTF
baby-re DEF CON CTF Qualifier 2016
amandhj DEF CON CTF Qualifier 2016
Pepperidge Farm DEF CON CTF Qualifier 2017

 まあ明らかにangr使ったほうが楽.とりあえずいくつかやってみただけで,ほかにもたくさんあると思う.
 Bareflank Hypervisorを読むのはいつになるかわかりません.

 いまさらながら,情報セキュリティ系論文紹介 Advent Calendar 2016あるいはBitVisor Advent Calendar 2016に投稿されるはずだった文章を供養する.

はじめに

 本稿では,シン,すなわち薄いハイパーバイザ (thin hypervisor) の動向を紹介する.
 薄いハイパーバイザとは,小規模であることを志向したハイパーバイザだ——ということにしておこう.小規模というのは,一部のイベントのみトラップするということだ.メリットとしては,実装・学習コストやオーバーヘッド,TCB (trusted computing base) を削減できる点が挙げられる.
 用語の初出はBitVisorの論文[T. Shinagawa, et al. VEE’09]だが,多くはWindows向けルートキットとして開発されたBlue Pillという手法[J. Rutkowska. Black Hat USA’06]の流れを汲んでいる.そのため,ブート時からゲストを掌握するのではなく,のちほど——あえてchain of trustの構築を放棄して——カーネルドライバとしてロードされるものが一般的である.そのほか,単一のゲストのみ対象とする,セキュリティを意識しているといった傾向が見られる.
 前編となる今回は,3種類の薄いハイパーバイザを見ていく.

ksm

リポジトリ https://github.com/asamy/ksm
形態 カーネルドライバ
仮想化支援機能 Intel VT-x with EPT
サポートしている環境 Windows 7/8/8.1/10, Linux
言語 C, アセンブリ言語
ライセンス GNU GPL v2

 ksmは高速,拡張可能かつ小規模であることを旨としているハイパーバイザで,アンチウイルスソフトウェアやサンドボックスへの利用を意識して開発されている.機能は以下の通り:

  • IDT Shadowing
  • EPT violation #VE (Broadwell以降)
  • EPTP switching VMFUNC (Haswell以降,もしサポートされていなければVMCALLで代替)
  • Builtin Userspace physical memory sandboxer (ビルドオプション)
  • Builtin Introspection engine (ビルドオプション)
  • APIC virtualization (実験的機能)
  • VMX Nesting (実験的機能)

 主なソースコードは以下の通り:

main_nt.c, main_linux.c カーネルドライバのエントリポイント,ioctlディスパッチ
ksm.c ハイパーバイザ全体の初期化
vmx.S, vmx.asm IDTやEPT violationのトラップ,ゲストのレジスタ退避
vcpu.c VMCSの読み書き
exit.c VMExitのハンドラ
um/um.c ユーザーランドのエージェント
sandbox.c サンドボックス機能
introspect.c イントロスペクション機能
epage.c EPTフック機能
mm.c メモリ管理
hotplug.c CPUホットプラグに関する処理

 カーネルドライバをロードし,ユーザーランドのエージェントからioctlを発行,サンドボックス機能またはイントロスペクション機能,EPT機能を呼び出すという流れになっている.詳細はDocumentation/SPEC.rstを参照のこと.

SimpleVisor

リポジトリ https://github.com/ionescu007/SimpleVisor
形態 カーネルドライバ
仮想化支援機能 Intel VT-x with EPT, AMD-V
サポートしている環境 Windows 8/8.1/10, UEFI
言語 C, アセンブリ言語
ライセンス 2 clause BSD

 SimpleVisorは,名前の通り極力シンプルであることをめざしたハイパーバイザだ.全体で1700行程度.開発者はWindows Internalsの著者のひとりで,README.mdにはこう書かれている:

Have you always been curious on how to build a hypervisor? Has Intel’s documentation (the many hundreds of pages) gotten you down? Have the samples you’ve found online just made things more confusing, or required weeks of reading through dozens of thousands of lines and code? If so, SimpleVisor might be the project for you.

 そういうわけで,SimpleVisorはIntel SDM (とくに,ハイパーバイザまわりはVol. 3C) の解読にうんざりした人向けだ.学習にはもってこい.CPUID, INVD, VMX, XSETBVのみトラップするようになっており,ネストには対応していない.しかし,XenやBochsでさえ追いついていない最新の仮想化支援機能にいちはやく追随しようとしている.
 主なソースコードは以下の通り:

nt/shvos.c カーネルドライバのエントリポイント
shv.c VMExit/VMEntryのコールバック関数の登録
shvvp.c コールバック関数の実体,仮想CPUの初期化
shvvmx.c VMCSの初期化
shvvmxhv.c VMExitのハンドラ
shvutil.c GDTの変換

 シンプル.

HyperPlatform

リポジトリ https://github.com/tandasat/HyperPlatform
形態 カーネルドライバ
仮想化支援機能 Intel VT-x with EPT
サポートしている環境 Windows 7/8.1/10
言語 C++, アセンブリ言語
ライセンス MIT

 HyperPlatformは,カーネルランドで動くコード,すなわちWindows向けルートキットやWindowsカーネル自体の解析を目的として開発されているハイパーバイザ.物理メモリと仮想メモリへのアクセス,関数呼び出し,命令単位のコード実行を監視できるようになっている[S. Tanda. REcon’16]
 主なソースコードは以下の通り:

driver.cpp カーネルドライバのエントリポイント,各種コールバック関数の登録
log.cpp ログ出力
global_object.cpp グローバル変数の初期化
performance.cpp パフォーマンス計測
util.cpp PTE_BASEの取得,メモリアドレス変換,VMCALLのラッパなど
power_callback.cpp 電源状態のコールバック関数
hotplug_callback.cpp CPUホットプラグのコールバック関数
vm.cpp 仮想CPUの初期化,VMCSの読み書き
ept.cpp EPTの構成
vmm.cpp 命令のトラップ,VMExitのハンドラ
Arch/x64/x64.asm, Arch/x86/x86.asm VMX命令呼び出しにともなう命令列
kernel_stl.cpp ntoskrnl経由でカーネルドライバからSTLを利用する

 エントリポイントから手続き的に書き下されていて,わかりやすい.ハイパーバイザとしての機能もさることながら,STLを強引に呼び出すハックがかっこいい.
 この拡張例には以下がある:

MemoryMon カーネルランドへのコード挿入を検知する
EopMon マルウェアによる特権昇格攻撃を検知する
DdiMon EPTを用いたAPIフック
GuardMon PatchGuardの挙動解析

 いずれもカーネルドライバのエントリポイントで追加機能を初期化するしくみ.
 やや温め納豆は遠くなりにけり.

おわりに

 いずれもSandy Bridge以降のいまどきの環境であれば動作する.
 卒業研究ではQEMUをベースとしたマルウェア解析環境を開発していたのだけど,やはり速度面に難があるし,このあたりの技術を再検討しないとなー.
 なお,今回取り上げなかったハイパーバイザには以下のようなものがある:

MoRE ルートキット文脈のハイパーバイザ.やや古い
HyperBone HyperPlatformと似た機能をもつ.やや古い
Noah 未踏のアレ.OS X上でLinuxバイナリを動かす.MacBook持ってないので試せないのだわ
Bareflank type 1, 2, ドライバいずれの形態のVMMもサポートしたライブラリ.しかもC++で書ける

 なかでもBareflankがヤバいので後編ではこれを読んでいきます.

 本稿はSFC-RG Advent Calendar 2016の4日目である.

はじめに

 あなたは研究の中間発表を終えて,今晩何を食べようか考えている.たしかに準備不足ではあったけれど,研究の前提をいまいち解さないファカルティの高飛車な質問にはうんざりしたし,今日くらいはパーッと気分転換したいものだ.そういうわけで,あなたは⊿館を飛び出して焼肉 ざんまい 湘南台店に行くことにした.

組合せ最適化

 さて,着席し,メニューを開いたあなたはしばし考える.限られた予算,限られた時間,限られた胃袋の容量——いったい何を頼めば最も満足できるだろうか?
 そんなとき,組合せ最適化が役に立つんです.騙されたと思って,メニューを必死に転記してみよう:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import numpy as np, pandas as pd
menu = ['カルビ', '和牛カルビ', '和牛中落ちカルビ', 'ハラミ', '厚切りロース', 'ネギ牛タン塩', '牛タン塩',
'イベリコ豚', 'カシラ', '豚トロ', 'ネギ豚タン塩', '豚タン塩', '厚切りベーコン', 'ウインナー', 'チョリソ',
'ホルモン', 'シロコロホルモン', 'レバー', '白レバー', 'ハツ', 'ミノ',
'お得!! 三種盛り', '本日の塩三種盛り', '本日の味噌三種盛り']
price = [720, 950, 850, 720, 690, 950, 850,
600, 550, 580, 680, 580, 500, 380, 400,
550, 600, 550, 450, 550, 650,
1280, 780, 780]
n = len(menu)
np.random.seed(0)
df = pd.DataFrame({
'品目': menu,
'値段': price,
'満足度': np.random.randint(10, 20, n),
'焼き時間': np.random.randint(5, 10, n),
'量': np.random.randint(10, 20, n),
})
print(df)
値段 品目 満足度 焼き時間
0 720 カルビ 15 9 10
1 950 和牛カルビ 10 8 10
2 850 和牛中落ちカルビ 13 5 14
3 720 ハラミ 13 8 15
4 690 厚切りロース 17 5 15
5 950 ネギ牛タン塩 19 7 16
6 850 牛タン塩 13 8 18
7 600 イベリコ豚 15 5 14
8 550 カシラ 12 6 11
9 580 豚トロ 14 8 14
10 680 ネギ豚タン塩 17 8 19
11 580 豚タン塩 16 8 18
12 500 厚切りベーコン 18 5 11
13 380 ウインナー 18 6 11
14 400 チョリソ 11 6 17
15 550 ホルモン 16 6 19
16 600 シロコロホルモン 17 5 19
17 550 レバー 17 7 13
18 450 白レバー 18 9 16
19 550 ハツ 11 8 17
20 650 ミノ 15 8 12
21 1280 お得!! 三種盛り 19 7 10
22 780 本日の塩三種盛り 18 9 13
23 780 本日の味噌三種盛り 19 7 15

 メニューがpandasのデータフレームになった.取り急ぎ(?)満足度・焼き時間・量は乱数で埋めている.
 あなたはこのメニューの中から,最も満足度の高い組合せを見つけたい.それも,限られた予算,限られた時間,限られた胃袋の容量という条件を満たすような.ここではそのわがままを割当問題として解くことにする.

変数 $x_i \in \{0, 1\}$ i番目の料理を選ぶかどうか
目的関数 $\sum_i{満足度_i x_i}$ $\rightarrow$ 最大
制約条件 $\sum_i{予算_i x_i} \le 12000$ 予算制限
$\sum_i{焼き時間_i x_i} \le 120$ 時間制限
$150 \le \sum_i{量_i x_i} \le 200$ 分量制限

 こういった問題はpulpという整数計画法ライブラリを使って解くことができる:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from pulp import *
m = LpProblem(sense = LpMaximize) # 最大化問題
df['x'] = [LpVariable('x%d' % i, cat = LpBinary) for i in range(n)] # i番目の品目を選択するかしないか
m += lpDot(df.満足度, df.x) # 目的関数:満足度の最大化
m += lpDot(df.値段, df.x) <= 12000 # 制約条件:予算
m += lpDot(df.焼き時間, df.x) <= 120 # 制約条件:焼き時間
m += lpDot(df.量, df.x) >= 150 # 制約条件:量
m += lpDot(df.量, df.x) <= 200 # 制約条件:量
m.solve()
if m.status == 1:
df['val'] = df.x.apply(lambda v: value(v)) # 結果
print(df[df.val == 1].品目)
print('満足度 {}'.format(sum(df[df.val == 1].満足度)))
print('値段 {}'.format(sum(df[df.val == 1].値段)))
>>>
0 カルビ
4 厚切りロース
5 ネギ牛タン塩
7 イベリコ豚
8 カシラ
9 豚トロ
12 厚切りベーコン
13 ウインナー
16 シロコロホルモン
17 レバー
18 白レバー
20 ミノ
21 お得!! 三種盛り
22 本日の塩三種盛り
23 本日の味噌三種盛り
Name: 品目, dtype: object
満足度 251
値段 10060

 はい.順番はともかくとして,これらを食べれば満足できそうだ.
 ここまで考えたところで,あなたは今月の懐具合がよろしくないことを思い出す.なるべく出費を抑えて,それでいてある程度満足できるような品目の組合せはあるだろうか?
 これも同様に割当問題として考えられる.

変数 $x_i \in \{0, 1\}$ i番目の料理を選ぶかどうか
目的関数 $\sum_i{予算_i x_i}$ $\rightarrow$ 最小
制約条件 $\sum_i{満足度_i x_i} \le 200$ 満足度制限
$\sum_i{焼き時間_i x_i} \le 120$ 時間制限
$150 \le \sum_i{量_i x_i} \le 200$ 分量制限
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
m = LpProblem(sense = LpMinimize) # 最小化問題
a['v'] = [LpVariable('v%d' % i, cat = LpBinary) for i in range(n)] # i番目の品目を選択するかしないか
m += lpDot(df.値段, df.x) # 目的関数:予算の最小化
m += lpDot(df.満足度, df.x) >= 200 # 制約条件:満足度
m += lpDot(df.焼き時間, df.x) <= 120 # 制約条件:焼き時間
m += lpDot(df.量, df.x) >= 150 # 制約条件:量
m += lpDot(df.量, df.x) <= 200 # 制約条件:量
m.solve()
if m.status == 1:
df['val'] = df.x.apply(lambda v: value(v)) # 結果
print(df[df.val == 1].品目)
print('満足度 {}'.format(sum(df[df.val == 1].満足度)))
print('値段 {}'.format(sum(df[df.val == 1].値段)))
>>>
7 イベリコ豚
10 ネギ豚タン塩
11 豚タン塩
12 厚切りベーコン
13 ウインナー
14 チョリソ
15 ホルモン
16 シロコロホルモン
17 レバー
18 白レバー
22 本日の塩三種盛り
23 本日の味噌三種盛り
Name: 品目, dtype: object
満足度 200
値段 6850

 200の満足度でいいなら豚ばっか食ってろということらしい.

多腕バンディット問題

 いやいやちょっと待った.乱数でお茶を濁しているけど,あらゆる品目の満足度なんて知らないじゃないか.全品目を食べたことがあるならいざ知らず.それに,毎日同じ店で同じ食事をとるわけでもない.焼肉屋にしたって,湘南台には寅屋にえのもとにとあるわけだ.
 そういうわけで,あなたはいろいろな店に行き,いろいろな注文をして,なるべくどれを頼んでも満足度の高い食事のとれる店を見つけたいと考えた.しかしここで疑念が生まれる——いまの店でそこそこ満足できるなら,別の店を探さなくてもよいのではないか? しかしいまの店に行き続ける限り,別の店の食事を食べることはできないぞ.しかしそこがハズレだとしたら.さてどうしよう——業界用語でこれを探索利用のトレードオフ (exploration-exploitation tradeoff) という.

これまでの学習結果を利用 (exploitation) しようとすると,探索 (exploration) が減ってしまい,機会損失が増えてしまう.一方,探索を増やせば,学習した最良の行動とは異なる行動をとることが増えるため,得られる報酬が減ってしまう.学習した最良の行動との差が,探索のコストということになる.– 牧野,et al. 『これからの強化学習

 このトレードオフを解消する試みを多腕バンディット問題という.多腕バンディット問題は,複数のスロットマシンのレバー(腕)を次々と引いていき,最終的に得られる報酬を最大化するというもので,強化学習の一種といえる.各スロットマシンの報酬はそれぞれ一定の確率分布に従っている.言い換えれば,いろいろな店のメニューにある品目を試していき,最終的に得られる満足度を最大化していく,ということになる.もちろん,品目によって得られる満足度に違いはあるが,なるべく何を食べても満足度の高い店を絞り込んでいきたい.
 そのためのアルゴリズムのうち,ここでは$\epsilon$-GreedyとUCB1を紹介したい.

$\epsilon$-Greedy

 デフォルトで現在最良な選択肢を選ぶが,一定の確率でいま最良と思っていないような選択肢を選びにいく手法.

  • $\epsilon - 1$の確率で最適だと思われる選択肢を利用する
  • $\epsilon / 2$の確率で最適だと思われる選択肢を探索する
  • $\epsilon / 2$の確率で最悪だと思われる選択肢を探索する

 $\epsilon$ Greedyはつまり行きつけの店に入り浸るタイプのアルゴリズムだ.

UCB1

 それもいいが,実はいまの行きつけよりもっといい店なのに,一度行って微妙だったからといって行かないままになっている店がないだろうか? UCB1は,それぞれの店についてどれくらい知っているかを考慮に入れ,なるべく多くの店のことを知ろうとするアルゴリズムだ.具体的には,各店(腕)についてUCB (Upper Confidence Bound) という値を計算する.

 $\overline {x}_{j}+c\sqrt {\dfrac {2\log n} {x_{j}}}$

 ただし$\overline {x}_{j}$$_{j}$番目の店の満足度(報酬)の期待値,$n$はそれまでに店を回った回数の合計,$n_{j}$$_{j}$番目の店に行った回数,$c$は定数.UCB1は,この値が最大になる店に飛び込んでいく.あなたが好奇心旺盛なら,こちらのアルゴリズムを使って考えたほうがいいだろう.
 この手法のメリットとして,ベストでない店に行く回数を確率$1$$O(\log n)$以内に抑えられる.長々とした証明は論文を参照していただくとして,これは店に行く回数が十分大きい場合の理論限界に到達している.デメリットとしては,探索のためによくない店にあたってしまうことが多いという点が挙げられる.

実験

 では,$\epsilon$-GreedyとUCB1が最良の店を選ぶ過程はどうなっているだろうか? 『バンディットアルゴリズムによる最適化手法』のサンプルコードをPython3 + numpy向けに改変して実験.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import random
import math
import seaborn as sns
class BernoulliArm():
def __init__(self, p):
self.p = p
def draw(self):
if random.random() > self.p:
return 0.0
else:
return 1.0
class EpsilonGreedy():
def __init__(self, epsilon, counts, values):
self.epsilon = epsilon
self.counts = counts
self.values = values
return
def initialize(self, n_arms):
self.counts = np.zeros(n_arms)
self.values = np.zeros(n_arms)
return
def select_arm(self):
if random.random() > self.epsilon:
return np.argmax(self.values)
else:
return np.random.randint(0, len(self.values))
def update(self, chosen_arm, reward):
self.counts[chosen_arm] += 1
n = self.counts[chosen_arm]
value = self.values[chosen_arm]
new_value = ((n-1) / float(n)) * value + (1 / float(n)) * reward
self.values[chosen_arm] = new_value
return
class UCB1():
def __init__(self, counts, values):
self.counts = counts
self.values = values
return
def initialize(self, n_arms):
self.counts = np.zeros(n_arms)
self.values = np.zeros(n_arms)
return
def select_arm(self):
n_arms = len(self.counts)
for arm in range(n_arms):
if self.counts[arm] == 0:
return arm
ucb_values = [0.0 for arm in range(n_arms)]
total_counts = sum(self.counts)
for arm in range(n_arms):
bonus = math.sqrt((2 * math.log(total_counts)) / float(self.counts[arm]))
ucb_values[arm] = self.values[arm] + bonus
return np.argmax(ucb_values)
def update(self, chosen_arm, reward):
self.counts[chosen_arm] = self.counts[chosen_arm] + 1
n = self.counts[chosen_arm]
value = self.values[chosen_arm]
new_value = ((n - 1) / float(n)) * value + (1 / float(n)) * reward
self.values[chosen_arm] = new_value
return
def test_algorithm(algo, arms, num_sims, horizon):
chosen_arms = np.zeros(num_sims * horizon)
rewards = np.zeros(num_sims * horizon)
cumulative_rewards = np.zeros(num_sims * horizon)
sim_nums = np.zeros(num_sims * horizon)
times = np.zeros(num_sims * horizon)
for sim in range(num_sims):
sim += 1
algo.initialize(len(arms))
for t in range(horizon):
t += 1
index = (sim - 1) * horizon + t - 1
sim_nums[index] = sim
times[index] = t
chosen_arm = algo.select_arm()
chosen_arms[index] = chosen_arm
reward = arms[chosen_arm].draw()
rewards[index] = reward
if t == 1:
cumulative_rewards[index] = reward
else:
cumulative_rewards[index] = cumulative_rewards[index - 1] + reward
algo.update(chosen_arm, reward)
return [sim_nums, times, chosen_arms, rewards, cumulative_rewards]
means = np.array([0.1, 0.1, 0.1, 0.1, 0.9])
n_arms = len(means) # 腕は5本
random.shuffle(means)
arms = [BernoulliArm(x) for x in means]
for epsilon in [0.1, 0.2, 0.3, 0.4, 0.5]: # epsilonを変えたらどうなるか?
algo = EpsilonGreedy(epsilon, [], [])
algo.initialize(n_arms)
results = test_algorithm(algo, arms, 5000, 500)
df = pd.DataFrame({"times": results[1], "rewards": results[3]})
grouped = df["rewards"].groupby(df["times"])
plt.plot(grouped.mean(), label="epsilon="+str(epsilon))
algo = UCB1([], [])
algo.initialize(n_arms)
results = test_algorithm(algo, arms, 5000, 500)
df = pd.DataFrame({"times": results[1], "rewards": results[3]})
grouped = df["rewards"].groupby(df["times"])
plt.plot(grouped.mean(), label="UCB1")
plt.legend(loc="best")
plt.show()

 好奇心旺盛な人は序盤それなりに苦労することがわかる.

おわりに

 こうしたナイーブな実装で焼肉の頼み方を最適化したり,店の巡り方を最適化できるかどうかというとまあ実際微妙(たとえば焼肉の各品目から得られる満足度は,限界効用逓減の法則に従ってすり減っていく)だが,日々の意思決定をアルゴリズム的に考えてみる遊びはそれなりにおもしろい.ソーシャルゲームにどう課金するか,というのもこの俎上に載せられる.
 『Algorithms to Live By: The Computer Science of Human Decisions』という本はそういう,情報系の人間がよくやる与太話をまじめに考察したものだ——書類はどのキャッシュアルゴリズムに従って並べるべきかとか.先に挙げた探索と利用のトレードオフについても述べられている.YouTubeで著者らの講演を観てほしい.

 はい.

参考文献

追記 (2016.12.06)

 今回のような設定では多腕バンディット問題というより最適腕識別として考えたほうがよさそう.多腕バンディット問題は累積報酬の最大化が目的だけれど,最適腕識別はどの腕が最良か発見するのが目的.将来の報酬が最大の腕を見つける,ということ.『バンディット問題の理論とアルゴリズム』を読めばいいんだとか.

はじめに

 サイバーセキュリティに携わる者なら一度くらいはKDD Cup 99 Dataなるデータセットの名を耳にしたことがあるのではないだろうか.KDD Cupは国際会議SIGKDDによるデータマイニングのコンペで,KDD Cup 99 Dataはそのためのネットワーク侵入検知にまつわるデータ.正常通信と攻撃を分類するタスクが与えられた.
 見てみよう.

データセットの構成

 データは現在,カリフォルニア大学アーバイン校によって配布されている.
 それぞれのファイル内容は下記の通り:

ファイル名 ファイル内容
kddcup.data フルデータ
kddcup.data_10_percent フルデータの10%を抽出した学習用データ
corrected 正常・攻撃のラベル付けがなされた評価用データ
kddcup.testdata.unlabeled 正常・攻撃のラベル付けがなされていないデータ
kddcup.testdata.unlabeled_10_percent 正常・攻撃のラベル付けがなされていないデータの10%サブセット
kddcup.newtestdata_10_percent_unlabeled 正常・攻撃のラベル付けがなされていないデータの10%サブセット

 ファイルの中身はこんな調子:

1
2
3
4
5
6
7
8
9
$ wget -r -l 1 http://kdd.ics.uci.edu/databases/kddcup99/
$ gunzip -r kdd.ics.uci.edu/kddcup99
$ ln -s kdd.ics.uci.edu/databases/kddcup99 kddcup99
$ head -5 kddcup99/kddcup.data
0,tcp,http,SF,215,45076,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,0,0,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,normal.
0,tcp,http,SF,162,4528,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,2,2,0.00,0.00,0.00,0.00,1.00,0.00,0.00,1,1,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,normal.
0,tcp,http,SF,236,1228,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,2,2,1.00,0.00,0.50,0.00,0.00,0.00,0.00,0.00,normal.
0,tcp,http,SF,233,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,2,2,0.00,0.00,0.00,0.00,1.00,0.00,0.00,3,3,1.00,0.00,0.33,0.00,0.00,0.00,0.00,0.00,normal.
0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,3,3,0.00,0.00,0.00,0.00,1.00,0.00,0.00,4,4,1.00,0.00,0.25,0.00,0.00,0.00,0.00,0.00,normal.

 これは,ファイルの特徴をカンマ区切りで列挙したもの.列に特徴名を振れば,pandas(や,scikit-learn)での扱いも楽.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
>>> import pandas
>>>
>>> col_names = ["duration","protocol_type","service","flag","src_bytes",
... "dst_bytes","land","wrong_fragment","urgent","hot","num_failed_logins",
... "logged_in","num_compromised","root_shell","su_attempted","num_root",
... "num_file_creations","num_shells","num_access_files","num_outbound_cmds",
... "is_host_login","is_guest_login","count","srv_count","serror_rate",
... "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
... "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count",
... "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate",
... "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate",
... "dst_host_rerror_rate","dst_host_srv_rerror_rate","label"]
>>>
>>> kdd_data_10percent = pandas.read_csv("kddcup99/kddcup.data_10_percent", header=None, names = col_names)

 学習用データは通常データ,22種類の攻撃データの計23種類のデータからなる.評価用データは,学習用データに加えて17種類の攻撃データを含んでいる.これらの攻撃は4つのクラスに大別される:

クラス 説明 サブクラス
Normal 通常のコネクション normal
Probe 攻撃対象の探索・調査 ipsweep, nmap, postsweep, satan, mscan, saint
DoS DoS攻撃 back, land, neptune, pod, smurf, teardrop, mailbomb, apache2, processtable, udpstorm
U2R ローカルマシンからrootへの許可されていないアクセス buffer.overflow, localmodule, perl, rootkit, httptunnel, xterm, ps, worm
R2L リモートマシンからの許可されていないアクセス ftp_write, guess_passwd, imap, multihop, phf, spy, warezclient, warezmaster, snmpgetattack, snmpguess, xsnoop, named, sendmail, sqlattack, xlock

 サブクラス名から古臭さがにじんでいるが,データセットは近年の研究でも広く用いられているものだ.まあ単純に同規模の新しいデータセットがないからだろう.
 だいたいの論文はkddcup.data_10_percentを用いて学習し,correctedを用いて評価するという流れになっている.
 ところで,このデータセットに付随したタスクは正常通信と攻撃の二値(まあ多値分類になりはするが)分類だが,これは異常検知とどう違うのだろうか.ものの本によると,二値分類はベイズ決定則(条件付き分布$p(x|y=1)p(y=1)$$p(x|y=0)p(y=0)$の比が1を超えたら異常と判定),異常検知はネイマン・ピアソン決定則($p(x|y=1)$$p(x|y=0)$の比がある閾値を超えたら異常と判定)にもとづいている.ここで,異常検知問題ではほとんどつねに$p(y=1)<<p(y=0)$であるため,ベイズ決定則は異常判定を強く抑制する.そういうわけで,標本の割合を吟味して二値分類器の閾値をスライドさせていくことが重要となってくるらしい.

データセット形式への変換

 tcpdump2gureKDDCup99は,Bro IDSのプラグインとして,おなじみのpcapファイルをKDD Cup 99 Dataのフォーマットに変換してくれる.Bro IDSはSnortには及ばずともそこそこ由緒あるIDSで,GitHubリポジトリは米国政府公式のリポジトリリストにも掲載されている.
 まずBro IDSをインストールする.

1
2
3
4
5
6
$ sudo apt-get install cmake make gcc g++ flex bison libpcap-dev libssl-dev python-dev swig zlib1g-dev
$ git clone --recursive git://git.bro.org/bro
$ cd bro
$ ./configure
$ make -j4
$ sudo make install

 私はpyenvからインストールしたPython 2.7.11を常用しているのだが,Bro IDSをmakeするにはCFLAGS="-fPIC" pyenv install 2.7.11のように共有ライブラリ用のオプションをつけてPythonを再インストールする必要があった.
 さてWireshark公式が公開しているサンプルデータをKDD Cup 99 Dataの形式に変換してみる.

1
2
3
4
5
6
7
8
9
10
11
$ cd
$ git clone git@github.com:inigoperona/tcpdump2gureKDDCup99
$ gcc tcpdump2gureKDDCup99/trafAld.c -o tcpdump2gureKDDCup99/trafAld.out
$ wget "https://wiki.wireshark.org/SampleCaptures?action=AttachFile&do=get&target=zlip-1.pcap" -O zlip-1.pcap
$ bro -r zlip-1.pcap tcpdump2gureKDDCup99/darpa2gurekddcup.bro > conn.list
$ cat conn.list
1 955453631.643199 1024 53 10.0.0.1 146.84.28.88 0.000000 udp 53 S0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
$ sort -n conn.list > conn_sort.list
$ tcpdump2gureKDDCup99/trafAld.out conn_sort.list
$ cat trafAld.list
1 955453631.643199 1024 53 10.0.0.1 146.84.28.88 0.000000 udp 53 S0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0 0 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000

 pcapファイルをPythonで分析したければそのまま(scapyや)pandasに突っ込むだろうけど,KDD Cup 99 Dataと比較したい場合には使えるかも.

おわりに

 名前は聞くけど触ったことないなということで.やらなければならない作業が進まないとこういう現実逃避が捗る捗る.

参考文献

 はい.

Xming

 XmingはWindows向けX Window System実装で,WindowsでX11 Forwardingをする場合のデファクトスタンダード.さて,日本語版のWindowsでXmingを利用していると,時折勝手に入力言語が英語に変わってしまうことがある.でまあOSSなのでソースコードを書き換えればいいんだけど,バイナリを読んだほうが早そう,ということでそのようにした.

原因

 IDA Proでキーボード関連のAPIの呼び出し元を眺めると,LoadKeyboardLayoutのロケールID引数を英語 (0x0409) に設定していることがわかる.これを日本語 (0x0411) にすればよい.

 素直に.rdataセクションに載っているので,そのまま書き換えられる.

 紀元前のバイナリパッチ方式で書くと下記の通り:

1
2
3
4
19AFAA: 30 31
19AFAB: 39 31
19AFB5: 55 4A
19AFB6: 53 50

 はい.

追記 (2016.11.07)

 XmingのバージョンはPublic Domain Releases 6.9.0.31で,より新しい有償版で修正されているのかどうかは知らない.で,6.9.0.31の当該ソースコード (xc/programs/Xserver/hw/xwin/winconfig.c) にはなにやら不吉なコメントが記されているが,見なかったことにする.うちの環境では,レジストリ設定でCaps LockをCtrlに入れ替えてるし.

1
2
3
4
5
6
7
8
9
10
if (keyboardType == 7)
{
/* Japanese layouts have problems with key event messages
such as the lack of WM_KEYUP for Caps Lock key.
Loading US layout fixes this problem. */
if (LoadKeyboardLayout("00000409", KLF_ACTIVATE) != NULL)
winMsg (X_INFO, "Loading US keyboard layout.\n");
else
winMsg (X_ERROR, "LoadKeyboardLaout failed.\n");
}

 ご自身の責任でやっていってください.

はじめに

 今年のセキュリティ・キャンプでは,うっかり「なぜマルウェア解析は自動化できないのか」という題の講義を行ってしまったが,それだけセキュリティの世界には自動化の波が来ている.本稿では,脆弱性分析の自動化をめざして開発されているangr, AFL, Drillerをざっくり紹介する.

angr

 angrはUCSBの研究チームにしてCTFチームShellphishを中心に開発されているバイナリ解析フレームワーク.論文[PDF]はIEEE S&P 2016に採択されている.手法の新規性というよりは実装力でゴリ押しするタイプ.評価には,アメリカ国防高等研究計画局が5,500万ドル(約56億円)の資金を投じてまで開催した脆弱性分析・修正の自動化コンペ,DARPA Cyber Grand Challenge (CGC) のデータセットが用いられている.CGCの決勝戦に進出したチームには75万ドル(約7,600万円),優勝したチームは200万ドル(約2億円)が与えられる.angr開発の目的のひとつが,CGCでの勝利にあることは疑いようもない——最終的な戦績は,CMUのツールMAYHEMに優勝を譲って3位だったが.
 さて,angrはシンボリック実行やプログラムスライシングを通して,プログラムの特定位置に到達するための入力値を抽出することができる.次のコードで雰囲気をつかめるだろうか:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import sys
import angr
import simuvex
# 解析対象を指定
p = angr.Project(sys.argv[1])
# 制御フローグラフの生成
cfg = p.analyses.CFG()
print [x for x in cfg.functions.iteritems()]
# シンボルを取得
target_addr = p.loader.main_bin.get_symbol("main").addr
# パス分析クラスのインスタンス
pg = p.factory.path_group()
# シンボルへのパスを分析
pg.explore(find = target_addr)
# avoidを避け,findに到達する入力値を探索してくれる
a = p.surveyors.Explorer(find = FIND_ADDR, avoid = AVOID_ADDR).run()
# フック
p.hook_symbol('strlen', simuvex.SimProcedures['stubs']['ReturnUnconstrained'])
# 実行結果のダンプ
a.found[0].state.posix.dumps(1)

 解析対象の規模が大きい場合,エントリポイントを起点とした解析に時間を要するあるいは失敗することがあるが,プログラムの途中から解析を始めることも可能だ.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
p = angr.Project(sys.argv[1])
# 解析の起点となるアドレス
state = p.factory.blank_state(addr = START_ADDR)
# その地点までプログラムを実行したときのスタックの状態
state.regs.ebp = BASE_ADDR
state.regs.esp = STACK_ADDR
# 入力値の設定
for i in range(INPUT_LENGTH):
s = state.se.BVS('Var[{}]'.format(i), 32, explicit_name = True)
state.memory.store(INPUT_ADDR + i * 4, s)
path = p.factory.path(state)
a = p.surveyors.Explorer(start = path, find= FIND_ADDR, avoid= AVOID_ADDR)
a.found[0].state.posix.dumps(1)

 シンボリック実行はSSA形式の中間表現を前提とするが,angrはValgrindのVEX IRを用いている.バックエンドのSMTソルバはZ3だが,claripyという自前のラッパが噛ませてある.
 これ以上の説明は公式ドキュメントに譲ろう.

AFL

 AFL (American Fuzzy Lop) はGoogleのエンジニアlcamtufを中心に開発されているファジングツール.遺伝的アルゴリズムによって正常な入力を次々と変異させていき,自動的にプログラムのバグを検出する.AFLにはbashやtcpdump, OpenSSHといったさまざまなソフトウェアのバグを検出した実績があり,いまや脆弱性分析の研究になくてはならない存在だ.一般的なファジングはプログラムの浅い箇所にしか到達できない.AFLは,大量の正常な入力をトリミングしてシードとするコーパス蒸留 (corpus distillation) と,遺伝的アルゴリズムを用いてシードを変異させるGAFuzzingのいいとこ取りを図ったものだ.その実行フローは次のようになる:

  1. ユーザーから与えられた初期テストケースをキューに入れる
  2. キューから次の入力テストケースをひとつ取り出し,
  3. プログラムの振る舞いに影響を与えない最小サイズにトリミングする
  4. バランスのよい探索となるよう,さまざまな変異戦略を用いて入力を繰り返し変異させる
  5. 新たな状態遷移が計測されたら,出力をキューに入れる
  6. 2に戻る

 ここでAFLはカバレッジ計測のため,解析対象のプログラムに計測用のコードを埋め込む.これには,解析対象のソースコードが手元にある場合gccやclangの独自フロントエンドが,解析対象のソースコードが手元にない場合QEMUが用いられる.
 ファジング機能の中核は,afl-fuzz.cmain()から呼び出されるfuzz_one()にある.実装されている変異戦略は次の通り:

  • SIMPLE BITFLIP
  • ARITHMETIC INC/DEC
  • INTERESTING VALUES
  • DICTIONARY STUFF
  • RANDOM HAVOC
  • SPLICING

 CGCで優勝したCMUのMAYHEMは,このAFLにシンボリック実行機能を追加していたようだ.MAYHEMといえば同名のシンボリック実行エンジンの論文[PDF]がIEEE S&P 2012で発表されているが,当時からの変更点がどれほどなのかはわからない.
 また,AFLの拡張に,解析対象のパスを枝刈りすることで高速化を図ったAFLFastがある.これもCGC決勝進出チームによるもので,論文[PDF]はACM CCS 2016に採択されている.600行程度のパッチでトップカンファレンス.ちょっと妬ましい.

Driller

 Drillerはangrの開発陣によるAFLの拡張.論文[PDF]はNDSS 2016に採択されている.AFLのREADMEには次のようにある:

Other, more sophisticated research has focused on techniques such as program flow analysis (“concolic execution”), symbolic execution, or static analysis. All these methods are extremely promising in experimental settings, but tend to suffer from reliability and performance problems in practical uses - and currently do not offer a viable alternative to “dumb” fuzzing techniques.

 シンボリック(コンコリック)実行は実用的ではないと切って捨てている.DrillerはangrとAFLを組み合わせることで,この批判を克服し,ファジングとシンボリック実行のいいとこ取りを図ったものだ:

 たとえば,次のコードの解析には,ファジングよりシンボリック実行が適している:

1
2
3
4
5
6
7
int main(void)
{
int x;
read(0, &x, sizeof(x));
if(x == 0x123ABCD)
vulnerable();
}

 なぜなら,ファジングによって0x123ABCDという値を生成するには大量の試行を必要とするからだ.一方で,次のコードの解析には,シンボリック実行よりファジングが適している:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int check(char *x, int depth)
{
if(depth >= 100){
return 0;
}else{
int count = (*x == 'B') ? 1 : 0;
count += check(x+1, depth+1);
return count;
}
}
int main(void)
{
char x[100];
read(0, x, 100);
if(check(x, 0) == 25)
vulnerable();
}

 なぜなら,単純なシンボリック実行ではパス爆発を解決できないからだ.
 Drillerは両手法の長所と短所を踏まえた上で,それらを堅実に組み合わせている.やはりハッシュ関数の充足はDrillerをもってしても難しいようだが,現行MAYHEMもAFLとシンボリック実行を併用しているようだし,このアプローチが現在の着地点なのだろう.

おわりに

 CGCとそれにともなう研究によって,脆弱性分析の自動化手法は大きな進歩を遂げつつあることが伝わっただろうか.学会のOSS重視傾向も相まって,さまざまなツールを手元で試せるようになってきている.大変ありがたいことだ.

 ※Rackサーバの方のUnicornではない.

はじめに

 UnicornQEMUベースの軽量,マルチプラットフォーム・マルチアーキテクチャ・JIT対応のCPUエミュレータ.周辺機器をエミュレーションしないため用途は限られるが,GoやPythonなど複数言語のバインディングを備えている.現在システムセキュリティ分野で最も注目されているOSSのひとつと言っても過言ではなく,AsiaCCS 2016で発表されたROPチェーン解析ツールROPMEMUや,遺伝的アルゴリズムによってROPチェーンを自動生成するツールroperのバックエンドとして用いられたり,Unicornと同じように注目を集めているバイナリ解析ツールangrとの連携が進められたりと,一大コミュニティを形成しつつある.
 コンセプトの説明はBlack Hat USA 2015の発表スライドに譲るとして,ここではその利用方法と内部実装にふれる.

Pythonバインディング

 情報セキュリティに興味があり,セキュリティ関係の仕事に携わりたいという人には,Pythonはうってつけの言語だ.その理由は,リバースエンジニアリングと攻撃コード作成のためのライブラリが充実しているためだ.
       – Justin Seitz『サイバーセキュリティプログラミング———Pythonで学ぶハッカーの思考』序文

 リバースエンジニアリングというタスクにUnicornを用いることを考える.他言語を選ぶ積極的な理由やPythonへのアレルギーがなければ,他のライブラリとの連携も考慮して,Pythonバインディングを活用すべきだろう.

エミュレーション

 最もシンプルな用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# unicorn/bindings/python/sample_x86.pyを抜粋・改変
from __future__ import print_function # Python 2.7を利用
from unicorn import *
from unicorn.x86_const import *
# エミュレーション対象の機械語
X86_CODE32 = b"\x41\x4a" # INC ecx; DEC edx
# 各基本ブロックに対するコールバック
def hook_block(uc, address, size, user_data):
print(">>> Tracing basic block at 0x%x, block size = 0x%x" %(address, size))
# 各命令に対するコールバック
def hook_code(uc, address, size, user_data):
print(">>> Tracing instruction at 0x%x, instruction size = %u" %(address, size))
# x86 32bitのコードをエミュレーション
def test_i386():
print("Emulate i386 code")
try:
# x86-32bitモードでエミュレータを初期化
mu = Uc(UC_ARCH_X86, UC_MODE_32)
# エミュレーション用に2MBのメモリを割り当て
mu.mem_map(ADDRESS, 2 * 1024 * 1024)
# 割り当てられたメモリに機械語を書き込み
mu.mem_write(ADDRESS, X86_CODE32)
# レジスタ初期化
mu.reg_write(UC_X86_REG_ECX, 0x1234) # エミュレーション中に加算される
mu.reg_write(UC_X86_REG_EDX, 0x7890) # エミュレーション中に減算される
# 各基本ブロックに対するコールバックを設定
mu.hook_add(UC_HOOK_BLOCK, hook_block)
# 各命令に対するコールバックを設定
mu.hook_add(UC_HOOK_CODE, hook_code)
# エミュレーション開始
mu.emu_start(ADDRESS, ADDRESS + len(X86_CODE32))
# レジスタの表示
print(">>> Emulation done. Below is the CPU context")
r_ecx = mu.reg_read(UC_X86_REG_ECX)
r_edx = mu.reg_read(UC_X86_REG_EDX)
print(">>> ECX = 0x%x" %r_ecx)
print(">>> EDX = 0x%x" %r_edx)
# メモリから命令列を読む
tmp = mu.mem_read(ADDRESS, 2)
print(">>> Read 2 bytes from [0x%x] =" %(ADDRESS), end="")
for i in tmp:
print(" 0x%x" %i, end="")
print("")
except UcError as e:
print("ERROR: %s" % e)
if __name__ == '__main__':
test_i386()

 これを実行すると次のような出力が得られる.

1
2
3
4
5
6
7
8
Emulate i386 code
>>> Tracing basic block at 0x1000000, block size = 0x2
>>> Tracing instruction at 0x1000000, instruction size = 1
>>> Tracing instruction at 0x1000001, instruction size = 1
>>> Emulation done. Below is the CPU context
>>> ECX = 0x1235
>>> EDX = 0x788f
>>> Read 2 bytes from [0x1000000] = 0x41 0x4a

 はい.
 なおエミュレーション時に各命令のトレースを得たければ,同じ開発陣による逆アセンブラCapstoneのPythonバインディングを併用して (詳細は割愛) 次のようなフックを用意すればよい:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
...
from capstone import *
...
class SimpleEngine:
def __init__(self):
self.capmd = Cs(CS_ARCH_X86, CS_MODE_32) # アーキテクチャ指定
def disas_single(self, data):
for i in self.capmd.disasm(data, 16): # 逆アセンブル
print("\t%s\t%s" % (i.mnemonic, i.op_str))
break
disasm = SimpleEngine()
...
def hook_code(uc, address, size, user_data):
print(">>> Tracing instruction at 0x%x, instruction size = %u, " %(address, size), end="")
# メモリから実行される命令を読む
ins = uc.mem_read(address, size)
disasm.disas_single(str(ins))

 実行すると命令のトレースが得られる.

1
2
3
4
...
>>> Tracing instruction at 0x1000000, instruction size = 1, inc ecx
>>> Tracing instruction at 0x1000001, instruction size = 1, dec edx
...

 このように,Unicornではプラグイン側のエントリポイントからQEMUの機能を呼び出していくことになる.

Pythonバインディングの内部

 この内部では,ctypesによる共有ライブラリのロードが行われている.unicorn/bindings/python/unicorn/unicorn.pyを見よう:

1
2
3
4
5
6
7
8
9
10
11
12
13
for _lib in _all_libs:
try:
if _lib == "unicorn.dll":
for dll in _all_windows_dlls: # load all the rest DLLs first
_lib_file = os.path.join(_lib_path, dll)
if os.path.exists(_lib_file):
ctypes.cdll.LoadLibrary(_lib_file)
_lib_file = os.path.join(_lib_path, _lib)
_uc = ctypes.cdll.LoadLibrary(_lib_file)
_found = True
break
except OSError:
pass

 ここで_ucunicorn.dll (環境によって異なるがいずれにせよ共有ライブラリ) がロードされている.
 さて,これまで利用してきた関数のプロトタイプ宣言はunicorn/bindings/python/unicorn/unicorn.pyにある:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# setup all the function prototype
def _setup_prototype(lib, fname, restype, *argtypes):
getattr(lib, fname).restype = restype
getattr(lib, fname).argtypes = argtypes
ucerr = ctypes.c_int
uc_engine = ctypes.c_void_p
uc_hook_h = ctypes.c_size_t
_setup_prototype(_uc, "uc_version", ctypes.c_uint, ctypes.POINTER(ctypes.c_int), ctypes.POINTER(ctypes.c_int))
_setup_prototype(_uc, "uc_arch_supported", ctypes.c_bool, ctypes.c_int)
_setup_prototype(_uc, "uc_open", ucerr, ctypes.c_uint, ctypes.c_uint, ctypes.POINTER(uc_engine))
_setup_prototype(_uc, "uc_close", ucerr, uc_engine)
_setup_prototype(_uc, "uc_strerror", ctypes.c_char_p, ucerr)
_setup_prototype(_uc, "uc_errno", ucerr, uc_engine)
_setup_prototype(_uc, "uc_reg_read", ucerr, uc_engine, ctypes.c_int, ctypes.c_void_p)
_setup_prototype(_uc, "uc_reg_write", ucerr, uc_engine, ctypes.c_int, ctypes.c_void_p)
_setup_prototype(_uc, "uc_mem_read", ucerr, uc_engine, ctypes.c_uint64, ctypes.POINTER(ctypes.c_char), ctypes.c_size_t)
_setup_prototype(_uc, "uc_mem_write", ucerr, uc_engine, ctypes.c_uint64, ctypes.POINTER(ctypes.c_char), ctypes.c_size_t)
_setup_prototype(_uc, "uc_emu_start", ucerr, uc_engine, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_size_t)
_setup_prototype(_uc, "uc_emu_stop", ucerr, uc_engine)
_setup_prototype(_uc, "uc_hook_del", ucerr, uc_engine, uc_hook_h)
_setup_prototype(_uc, "uc_mem_map", ucerr, uc_engine, ctypes.c_uint64, ctypes.c_size_t, ctypes.c_uint32)
_setup_prototype(_uc, "uc_mem_map_ptr", ucerr, uc_engine, ctypes.c_uint64, ctypes.c_size_t, ctypes.c_uint32, ctypes.c_void_p)
_setup_prototype(_uc, "uc_mem_unmap", ucerr, uc_engine, ctypes.c_uint64, ctypes.c_size_t)
_setup_prototype(_uc, "uc_mem_protect", ucerr, uc_engine, ctypes.c_uint64, ctypes.c_size_t, ctypes.c_uint32)
_setup_prototype(_uc, "uc_query", ucerr, uc_engine, ctypes.c_uint32, ctypes.POINTER(ctypes.c_size_t))
# uc_hook_add is special due to variable number of arguments
_uc.uc_hook_add = _uc.uc_hook_add
_uc.uc_hook_add.restype = ucerr
UC_HOOK_CODE_CB = ctypes.CFUNCTYPE(None, uc_engine, ctypes.c_uint64, ctypes.c_size_t, ctypes.c_void_p)
UC_HOOK_MEM_INVALID_CB = ctypes.CFUNCTYPE(
ctypes.c_bool, uc_engine, ctypes.c_int,
ctypes.c_uint64, ctypes.c_int, ctypes.c_int64, ctypes.c_void_p
)
UC_HOOK_MEM_ACCESS_CB = ctypes.CFUNCTYPE(
None, uc_engine, ctypes.c_int,
ctypes.c_uint64, ctypes.c_int, ctypes.c_int64, ctypes.c_void_p
)
UC_HOOK_INTR_CB = ctypes.CFUNCTYPE(
None, uc_engine, ctypes.c_uint32, ctypes.c_void_p
)
UC_HOOK_INSN_IN_CB = ctypes.CFUNCTYPE(
ctypes.c_uint32, uc_engine, ctypes.c_uint32, ctypes.c_int, ctypes.c_void_p
)
UC_HOOK_INSN_OUT_CB = ctypes.CFUNCTYPE(
None, uc_engine, ctypes.c_uint32,
ctypes.c_int, ctypes.c_uint32, ctypes.c_void_p
)
UC_HOOK_INSN_SYSCALL_CB = ctypes.CFUNCTYPE(None, uc_engine, ctypes.c_void_p)

 メモリ・レジスタを読み書きできることがわかる.
 さきほどは基本ブロック・命令単位のコールバック関数——フック——を設置した.その他のフックも次のように書ける:

1
2
3
4
5
6
7
8
9
10
11
12
mu.hook_add(UC_HOOK_BLOCK, hook_block) # 基本ブロック単位
mu.hook_add(UC_HOOK_CODE, hook_code) # 命令単位
mu.hook_add(UC_HOOK_CODE, hook_code, None, ADDRESS, ADDRESS+20) # 指定範囲[ADDRESS, ADDRESS+20]の全命令
mu.hook_add(UC_HOOK_INSN, hook_in, None, 1, 0, UC_X86_INS_IN) # IN命令
mu.hook_add(UC_HOOK_INSN, hook_out, None, 1, 0, UC_X86_INS_OUT) # OUT命令
mu.hook_add(UC_HOOK_MEM_WRITE, hook_mem_access) # メモリ書き込み
mu.hook_add(UC_HOOK_MEM_READ, hook_mem_access) # メモリ読み込み
mu.hook_add(UC_HOOK_MEM_READ | UC_HOOK_MEM_WRITE, hook_mem_access) # その両方
mu.hook_add(UC_HOOK_MEM_READ_UNMAPPED | UC_HOOK_MEM_WRITE_UNMAPPED, hook_mem_invalid) # 無効なメモリアクセス
mu.hook_add(UC_HOOK_INSN, hook_syscall, None, 1, 0, UC_X86_INS_SYSCALL) # システムコール
mu.hook_add(UC_HOOK_INTR, hook_intr) # 割り込み
...

 こうしたフックや,これまで用いてきたレジスタやメモリの読み書き・エミュレーションといった機能はUcクラスのメソッドとして用意されている:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class Uc(object):
def __init__(self, arch, mode):
# verify version compatibility with the core before doing anything
(major, minor, _combined) = uc_version()
if major != uc.UC_API_MAJOR or minor != uc.UC_API_MINOR:
self._uch = None
# our binding version is different from the core's API version
raise UcError(uc.UC_ERR_VERSION)
self._arch, self._mode = arch, mode
self._uch = ctypes.c_void_p()
status = _uc.uc_open(arch, mode, ctypes.byref(self._uch))
if status != uc.UC_ERR_OK:
self._uch = None
raise UcError(status)
# internal mapping table to save callback & userdata
self._callbacks = {}
self._ctype_cbs = {}
self._callback_count = 0
...
# emulate from @begin, and stop when reaching address @until
def emu_start(self, begin, until, timeout=0, count=0):
status = _uc.uc_emu_start(self._uch, begin, until, timeout, count)
if status != uc.UC_ERR_OK:
raise UcError(status)
...
# return the value of a register
def reg_read(self, reg_id):
if self._arch == uc.UC_ARCH_X86:
if reg_id in [x86_const.UC_X86_REG_IDTR, x86_const.UC_X86_REG_GDTR, x86_const.UC_X86_REG_LDTR, x86_const.UC_X86_REG_TR]:
reg = uc_x86_mmr()
status = _uc.uc_reg_read(self._uch, reg_id, ctypes.byref(reg))
if status != uc.UC_ERR_OK:
raise UcError(status)
return reg.selector, reg.base, reg.limit, reg.flags
if reg_id in range(x86_const.UC_X86_REG_FP0, x86_const.UC_X86_REG_FP0+8):
reg = uc_x86_float80()
status = _uc.uc_reg_read(self._uch, reg_id, ctypes.byref(reg))
if status != uc.UC_ERR_OK:
raise UcError(status)
return reg.mantissa, reg.exponent
# read to 64bit number to be safe
reg = ctypes.c_int64(0)
status = _uc.uc_reg_read(self._uch, reg_id, ctypes.byref(reg))
if status != uc.UC_ERR_OK:
raise UcError(status)
return reg.value
...
# read data from memory
def mem_read(self, address, size):
data = ctypes.create_string_buffer(size)
status = _uc.uc_mem_read(self._uch, address, data, size)
if status != uc.UC_ERR_OK:
raise UcError(status)
return bytearray(data)
...
# add a hook
def hook_add(self, htype, callback, user_data=None, begin=1, end=0, arg1=0):
_h2 = uc_hook_h()
# save callback & user_data
self._callback_count += 1
self._callbacks[self._callback_count] = (callback, user_data)
cb = None
if htype == uc.UC_HOOK_INSN: # 特定の命令に応じた内部処理
insn = ctypes.c_int(arg1)
if arg1 == x86_const.UC_X86_INS_IN: # IN命令
cb = ctypes.cast(UC_HOOK_INSN_IN_CB(self._hook_insn_in_cb), UC_HOOK_INSN_IN_CB)
if arg1 == x86_const.UC_X86_INS_OUT: # OUT命令
cb = ctypes.cast(UC_HOOK_INSN_OUT_CB(self._hook_insn_out_cb), UC_HOOK_INSN_OUT_CB)
if arg1 in (x86_const.UC_X86_INS_SYSCALL, x86_const.UC_X86_INS_SYSENTER): # SYSCALL/SYSENTER命令
cb = ctypes.cast(UC_HOOK_INSN_SYSCALL_CB(self._hook_insn_syscall_cb), UC_HOOK_INSN_SYSCALL_CB)
status = _uc.uc_hook_add(
self._uch, ctypes.byref(_h2), htype, cb,
ctypes.cast(self._callback_count, ctypes.c_void_p),
ctypes.c_uint64(begin), ctypes.c_uint64(end), insn
)
elif htype == uc.UC_HOOK_INTR: # 割り込み
cb = ctypes.cast(UC_HOOK_INTR_CB(self._hook_intr_cb), UC_HOOK_INTR_CB)
status = _uc.uc_hook_add(
self._uch, ctypes.byref(_h2), htype, cb,
ctypes.cast(self._callback_count, ctypes.c_void_p),
ctypes.c_uint64(begin), ctypes.c_uint64(end)
)
else:
if htype in (uc.UC_HOOK_BLOCK, uc.UC_HOOK_CODE): # 基本ブロック・命令単位
# set callback with wrapper, so it can be called
# with this object as param
cb = ctypes.cast(UC_HOOK_CODE_CB(self._hookcode_cb), UC_HOOK_CODE_CB)
status = _uc.uc_hook_add(
self._uch, ctypes.byref(_h2), htype, cb,
ctypes.cast(self._callback_count, ctypes.c_void_p),
ctypes.c_uint64(begin), ctypes.c_uint64(end)
)
elif htype & (uc.UC_HOOK_MEM_READ_UNMAPPED | # 無効なメモリアクセス
uc.UC_HOOK_MEM_WRITE_UNMAPPED |
uc.UC_HOOK_MEM_FETCH_UNMAPPED |
uc.UC_HOOK_MEM_READ_PROT | # 保護された領域へのメモリアクセス
uc.UC_HOOK_MEM_WRITE_PROT |
uc.UC_HOOK_MEM_FETCH_PROT):
cb = ctypes.cast(UC_HOOK_MEM_INVALID_CB(self._hook_mem_invalid_cb), UC_HOOK_MEM_INVALID_CB)
status = _uc.uc_hook_add(
self._uch, ctypes.byref(_h2), htype, cb,
ctypes.cast(self._callback_count, ctypes.c_void_p),
ctypes.c_uint64(begin), ctypes.c_uint64(end)
)
else: # メモリ読み書き
cb = ctypes.cast(UC_HOOK_MEM_ACCESS_CB(self._hook_mem_access_cb), UC_HOOK_MEM_ACCESS_CB)
status = _uc.uc_hook_add(
self._uch, ctypes.byref(_h2), htype, cb,
ctypes.cast(self._callback_count, ctypes.c_void_p),
ctypes.c_uint64(begin), ctypes.c_uint64(end)
)
# save the ctype function so gc will leave it alone.
self._ctype_cbs[self._callback_count] = cb
if status != uc.UC_ERR_OK:
raise UcError(status)
return _h2.value

おわりに

 PythonからUnicornを利用する方法を紹介した.
 結局のところctypesでラップされており,こうした機能がどのようにして実行されるのかは本体のコードを読まなければ理解できない.そういうわけで,そのうちUnicorn本体の実装を読んでいくことにする.