API解析
在现代恶意软件分析中,动态解析API是了解样本行为的关键手段之一。Qakbot采用了动态API解析技术,这种技术使得恶意代码的行为更加隐蔽,绕过了静态分析工具对已知API的检测
Qakbot采用了基于CRC32哈希校验的密钥派生机制,通过CRC32算法生成哈希值,并结合异或操作(XOR)来验证加密数据的有效性。具体过程如下:
样本首先解析PE文件导出表(Export Table)结构,对所有导出函数的名称进行CRC32哈希处理,然后与硬编码密钥0xA235CB91进行按位异或(XOR)操作。生成的哈希值随后与外部传入的API哈希值进行比对。如果这两个哈希值完全一致,则表明已经成功匹配到正确的API地址
了解算法和密钥后,可以解密API名称,并构建相应的结构体来存储每个DLL的API列表
字符串解密
解密流程
Qakbot通过XOR密钥来解密字符串,但该XOR密钥本身是使用AES算法加密的。具体流程如下:
首先调用CryptCreateHash API创建一个SHA-256哈希对象,并通过CryptHashData API对传入的aes_key进行哈希计算。随后,使用CryptDeriveKey API根据SHA-256哈希对象与AES-256算法派生出加密密钥。接着,通过CryptSetKeyParam API将AES配置为CBC模式,并将加密的XOR密钥前16字节作为 IV。完成密钥设置后,调用CryptDecrypt API对XOR密钥进行解密
得到解密后的XOR密钥后,使用该XOR密钥对字符串数组进行XOR解密,还原出明文字符串
通过 CyberChef,可以直观地再现该解密流程:
解密脚本
import hashlib
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
import idaapi
import idautils
import idc
def get_data(address, size):
data = idc.get_bytes(address, size)
if data is None:
raise ValueError(f"Failed to read data at address {hex(address)} with size {size}")
return data
def hex_to_int(x):
if type(x) == int:
return x
return int(x[:-1], 16)
def search_by_index(table , ind):
return(table[ind:].split('\x00')[0])
def set_hexrays_comment(address, text):
func_ea = idaapi.get_func(address).start_ea
cfunc = idaapi.decompile(func_ea)
if not cfunc:
print(f"❌ Function {func_ea:x} Decompilation failed")
return False
citems = cfunc.eamap.get(address, [])
if not citems:
return False
success = False
for item in citems:
tl = idaapi.treeloc_t()
tl.ea = address
tl.itp = idaapi.ITP_SEMI
tl.loc = item.obj_id
try:
cfunc.set_user_cmt(tl, text)
success = True
except Exception as e:
print(f"❌ Annotation failed {address:x} | type:{item.opname} | error:{str(e)}")
if success:
cfunc.save_user_cmts()
if hasattr(idaapi, 'refresh_idaview_anyway'):
idaapi.refresh_idaview_anyway()
else:
widget = idaapi.get_current_widget()
idaapi.refresh_idaview(widget)
return True
return False
def set_comment(address, text):
idc.set_cmt(address, text, 0)
set_hexrays_comment(address, text)
def get_valid_code_refs(func_names):
refs = []
for name in func_names:
target_ea = idc.get_name_ea_simple(name)
if target_ea == idc.BADADDR:
continue
refs.extend(list(idautils.CodeRefsTo(target_ea, 0)))
return list(set(refs))
def calc_sha256(data):
sha256_hash = hashlib.sha256()
sha256_hash.update(data)
return sha256_hash.digest()
def decrypt_xor_key(ciphertext, key, iv):
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted_data = cipher.decrypt(ciphertext)
plaintext = unpad(decrypted_data, AES.block_size)
return plaintext
def xor_decrypt(data, key):
decrypt_data = ''
for i in range(len(data)):
decrypt_data += chr(data[i] ^ key[i % len(key)])
return decrypt_data
def decrypt(encrypted_str, encrypted_xor_key, arg_aes_key):
aes_key = calc_sha256(arg_aes_key)
decrypted_xor_key = decrypt_xor_key(encrypted_xor_key[16:], aes_key, encrypted_xor_key[:16])
decrypted_str = xor_decrypt(encrypted_str, decrypted_xor_key)
return decrypted_str
def decrypt_text(decrypt_str, references):
for ref in references:
func_start = idaapi.get_func(ref).start_ea
cfunc = idaapi.decompile(func_start)
if ref not in cfunc.eamap:
continue
prev_instruction_address = idc.prev_head(ref)
while prev_instruction_address != idc.BADADDR:
if (idc.print_insn_mnem(prev_instruction_address) == "mov"
and idc.print_operand(prev_instruction_address, 0) == "ecx"
and idc.get_operand_type(prev_instruction_address, 1) == 5):
str_offset = idc.print_operand(prev_instruction_address, 1)
if not str_offset:
print(f"Warning: Empty str_offset at 0x{prev_instruction_address:x}")
break
try:
offset_int = hex_to_int(str_offset)
except ValueError:
print(f"Error: Invalid offset value at 0x{prev_instruction_address:x} - {str_offset}")
break
out_str = search_by_index(decrypt_str, offset_int)
print(f"🔓 0x{ref:x} -> {out_str} | offset:0x{offset_int:x}")
set_comment(ref, out_str)
break
else:
prev_instruction_address = idc.prev_head(prev_instruction_address)
if prev_instruction_address == idc.BADADDR:
print(f"Not found: {hex(ref)}")
break
if __name__ == "__main__":
CONFIG = {
"set1": {
"encrypt_data_addr": 0x1800297A0,
"encrypt_data_size": 0x1836,
"encrypt_xor_key_addr": 0x18002AFE0,
"encrypt_xor_key_size": 0xA0,
"aes_key_addr": 0x180029700,
"aes_key_size": 0x9F,
"ref_funcs": ["aug_Decrypt2", "aug_Decrypt"]
},
"set2": {
"encrypt_data_addr": 0x1800282A0,
"encrypt_data_size": 0x5AD,
"encrypt_xor_key_addr": 0x1800281C0,
"encrypt_xor_key_size": 0xD0,
"aes_key_addr": 0x180028150,
"aes_key_size": 0x63,
"ref_funcs": ["aug_DecryptStr2_", "aug_WrapDecryptStr"]
}
}
for config_name, config in CONFIG.items():
print(f"\n🔨 Processing configuration set [{config_name}]")
try:
encrypted_str = get_data(config["encrypt_data_addr"], config["encrypt_data_size"])
encrypted_xor_key = get_data(config["encrypt_xor_key_addr"], config["encrypt_xor_key_size"])
aes_key = get_data(config["aes_key_addr"], config["aes_key_size"])
decrypted_str = decrypt(encrypted_str, encrypted_xor_key, aes_key)
references = get_valid_code_refs(config["ref_funcs"])
print(f"📌 Found {len(references)} unique references:")
for i, ref in enumerate(references, 1):
print(f" [{i}] 0x{ref:x}")
decrypt_text(decrypted_str, references)
except Exception as e:
print(f"🔥 Processing of configuration set {config_name} failed: {str(e)}")
print("\n🎉 Processing completed!")
Comm
解密流程
Qakbot在.data段中嵌入了AES加密配置,其中包括两个关键数据结构:campaign info 和 c2 list。每个数据结构的第一个字节表示type,后续则为encrypt data
解密过程与字符串解密相同,但在解密之前,密钥会先通过SHA-256哈希算法进行处理。加密数据中的前16字节被用作 IV
。随后,使用密钥 T2X!wWMVH1UkMHD7SBdbgfgXrNBd(5dmRNbBI9
对加密数据进行解密。解密完成后,样本会使用前32字节的数据来验证解密结果的完整性,确保数据未被篡改
在 CyberChef 中的解密如下:
解密脚本
import hashlib
from datetime import datetime
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
import pefile
import binascii
import socket
def tohex(data):
if type(data) == str:
return binascii.hexlify(data.encode('utf-8'))
else:
return binascii.hexlify(data)
def get_ip(ip):
return socket.inet_ntoa(ip)
def get_section(filename, section_name=".data"):
try:
pe = pefile.PE(filename, fast_load=True)
for section in pe.sections:
raw_name = section.Name.decode('utf-8', errors='ignore')
clean_name = raw_name.rstrip('\x00').split('\x00')[0]
if clean_name == section_name:
return (section.get_data(section.VirtualAddress, section.SizeOfRawData))
return None
except (pefile.PEFormatError, FileNotFoundError) as e:
return None
finally:
if 'pe' in locals():
pe.close()
def calc_sha256(data):
sha256_hash = hashlib.sha256()
sha256_hash.update(data)
return sha256_hash.digest()
def decrypt_xor_key(ciphertext, key, iv):
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted_data = cipher.decrypt(ciphertext)
plaintext = unpad(decrypted_data, AES.block_size)
return plaintext
def decrypt(encrypted_str: bytes, arg_aes_key: bytes) -> bytes:
aes_key = calc_sha256(arg_aes_key)
iv = encrypted_str[1:17]
cipher = encrypted_str[17:]
decrypted_str = decrypt_xor_key(cipher, aes_key, iv)
return decrypted_str
def parse_config(input_str: bytes) -> None:
lines = input_str.strip().split(b'\r\n')
parsed_data = {}
for line in lines:
key, value = line.split(b'=')
parsed_data[key] = value
times = int(parsed_data[b'3'])
timestamp = datetime.fromtimestamp(times)
print(f"Botnet ID: {parsed_data[b'10']}")
print(f"40: {parsed_data[b'40']}")
print(f"Times: {timestamp}")
def parse_c2(ips):
i = 0
split_data = [ips[i:i+7] for i in range(1, len(ips), 8)]
for data in split_data:
ip = get_ip(data[:4])
port = int(tohex(data[4:6]), 16)
print('IP[{0}] = {1}:{2}'.format(i, ip, port))
i+=1
if __name__ == "__main__":
aes_key = b'T2X!wWMVH1UkMHD7SBdbgfgXrNBd(5dmRNbBI9'
filename = "your path"
size_rva = 0x1020
encrypt_config_rva = 0x1022
encrypt_c2_rva = 0x852
data_content = get_section(filename)
size = ord(data_content[size_rva : size_rva + 1])
encrypt_config_info = data_content[encrypt_config_rva : encrypt_config_rva + size]
encrypt_c2_info = data_content[encrypt_c2_rva : encrypt_c2_rva + size]
aes_key_hash = calc_sha256(aes_key)
config_info = decrypt(encrypt_config_info, aes_key)
c2_info = decrypt(encrypt_c2_info, aes_key)
print("------------------config info------------------")
print('sha256: ', tohex(config_info))
parse_config(config_info[32:])
print("------------------c2 info------------------")
print('sha256: ', tohex(c2_info))
parse_c2(c2_info[32:])
------------------config info------------------
sha256: b'560b887ca054e53b2dbf3601b1e518c9b40e96802c2e76b6e54f7879aad9bbfd31303d7463686b30380d0a34303d310d0a333d313730363731303935340d0a'
Botnet ID: b'tchk08'
40: b'1'
Times: 2024-01-31 22:22:34
------------------c2 info------------------
sha256: b'd640008cc859069dddc5b2869488ba83675e5f66a6ca19730b625ce900a830f0011fd2ad0a01bb0001b99cac3e01bb0001b971087b01bb00'
IP[0] = 31.210.173.10:443
IP[1] = 185.156.172.62:443
IP[2] = 185.113.8.123:443
通信
Qakbot的通信功能运行在独立线程中,主要通过HTTP协议与C2服务器进行通信。通信过程中,C2通信的密钥(4Lm7DW&yMFELN4D8oNp0CtKUfC2LAstORIBV
)经过AES加密处理后,被编码为Base64格式。样本使用InternetConnectA API与远程服务器建立连接,并通过HttpOpenRequestA和HttpSendRequestA API发送加密后的数据请求。最后,通过InternetReadFile API读取来自C2服务器的响应数据
建立通信后,C2服务器发送的命令通过整数值表示索引。这些命令以对应的索引形式传输
IOC
hash
sha256: af6a9b7e7aefeb903c76417ed2b8399b73657440ad5f8b48a25cfe5e97ff868f
c2
31.210.173.10:443
185.156.172.62:443
185.113.8.123:443