如何构建一个自己的 NVD 漏洞数据库?

因为毕设需要,笔者必须构建一个漏洞数据库,其中包含 CVE ID、CVSS 评分、攻击方式等信息。经过前期的一些尝试后,发现现在网上找到的主流方式均不再适用,因此只好自己写了个脚本。整个过程用到的工具 / 组件有:Python3,Pycharm,PostgreSQL,Docker。

前期尝试

因为项目中用到了攻击图生成工具 MulVAL,而 MulVAL 中也提供了漏洞数据库的构建脚本 nvd_sync.sh,简单分析代码后可以知道它是利用 NVD 提供的 XML 文件来构建数据库的。可是我配置好环境后运行该脚本,无法使用。简单看一下报错,是下载失败了。脚本中下载的语句如下:

while [ $i -le $year ]; do
wget http://nvd.nist.gov/download/nvdcve-$i.xml.gz
gunzip -f nvdcve-$i.xml.gz
i=`expr $i + 1`
done

我试着手动从该链接下载,但是失败了,前往官网查看,发现已经 404 了。但是天无绝人之路,虽然该文件无法下载,但是我看到了另外一个下载页面:NVD - Data Feeds (nist.gov)。在该页面手动下载 XML 文件,看到下载链接形如 https://nvd.nist.gov/feeds/xml/cve/trans/es/nvdcve-2021trans.xml.gz。抱着侥幸的心理,我修改了代码里的下载链接,祈祷只是下载链接变了而文件内容没有变。结果当然是…… 失败了!并不是下载失败,而是数据解析失败,数据库中有了相关字段,但值都是空的。

还在网上看到了一些使用爬虫的方式,但我觉得爬虫太慢了,而且也不知道 NVD 有没有反爬虫机制,不如直接下载文件进行解析、填充。由于对 XML 文件格式不太熟悉,所以就选择了 NVD 提供的另外一种文件类型:JSON。

数据库的选择

简单一点的话可以直接使用 MySQL,但因为我想尝试一点不一样的,同时也看到了 PostgreSQL 在数据量很大时性能比较好,因此就选择了 PostgreSQL。然后为了保证可迁移性,并没有在本地搭建 PostgreSQL 环境,而是使用了 Docker。关于 Docker 的说明与安装在这里不再介绍。

PostgreSQL 官方提供了 Docker,只需要 pull 下来启动即可,我使用的启动命令是:

docker run --name nvd-db -e POSTGRES_PASSWORD=自定义密码 \
-e POSTGRES_USER=nvd \
-e POSTGRES_DB=nvd \
-e TZ='Asia/Shanghai' \
-e ALLOW_IP_RANGE=0.0.0.0/0 \
-v /root/Code/Auto-PT-Auxiliary/db:/var/lib/postgresql/data \
-p 54321:5432 \
--restart always \
-d postgres

分析 JSON 文件结构

在刚才提到的网站下载一个 json 文件,打开后分析一下里面的结构,然后使用 python 的 json 库从中提取到需要的信息,详细含义如下:

import json
with open("../config/record.json",'r') as load_f:
load_dict = json.load(load_f)

# 获取第1个CVE对象
load_dict['CVE_Items'][1]
# 获取CVE ID
load_dict['CVE_Items'][1024]['cve']['CVE_data_meta']['ID']
# 获取该cve的CVSS3 base score
load_dict['CVE_Items'][1024]['impact']['baseMetricV3']['cvssV3']['baseScore']
# 获取该cve的CVSS3 exploitability score
load_dict['CVE_Items'][1024]['impact']['baseMetricV3']['exploitabilityScore']
# 获取该cve的CVSS3 impact score
load_dict['CVE_Items'][1024]['impact']['baseMetricV3']['impactScore']
# 获取CVSS3的攻击向量:network, local, adjacent network, physical
load_dict['CVE_Items'][1024]['impact']['baseMetricV3']['cvssV3']['attackVector']
# 获取漏洞等级:High, Medium, Low
load_dict['CVE_Items'][1024]['impact']['baseMetricV3']['cvssV3']['baseSeverity']
# 获取该cve的CVSS2 base score
load_dict['CVE_Items'][1024]['impact']['baseMetricV2']['cvssV3']['baseScore']
# 获取该cve的CVSS2 exploitability score
load_dict['CVE_Items'][1024]['impact']['baseMetricV2']['exploitabilityScore']
# 获取该cve的CVSS2 impact score
load_dict['CVE_Items'][1024]['impact']['baseMetricV2']['impactScore']

有的 CVE ID 没有被使用,但是在 json 文件中仍然存在,这些在入库时需要做筛选。目前发现筛选出来的标准之一是 impact 字段如果为 {},即没有任何数据,那么说明该 ID 没有被使用。可以用类似于 if load_dict['CVE_Items'][1024]['impact'] != {} 这样的 if 语句进行一个简单的过滤。

不过需要注意,nvd 的漏洞数据在 2015 年及之前都只有 CVSS2 评分,没有 CVSS3,直到 2016 年才出现了 CVSS3。而最新(2021.12 月去看的)的一些漏洞,又只有 CVSS3,没有 CVSS2。

将数据导入数据库

直接上代码了:

import json
import os

import psycopg2


class DataBaseConnector:
def __init__(self, _db_config: dict):
self.db_host = _db_config['host'] # 数据库服务器
self.db_port = _db_config['port'] # 数据库端口,MySQL默认3306,PostgreSQL默认5432
self.db_user = _db_config['user'] # 数据库用户
self.db_password = _db_config['password'] # 数据库密码
self.db_name = _db_config['db_name'] # 数据库名
self.db_table = _db_config['db_table'] # 数据表名

self.conn = self.connect_db() # 数据库连接对象

self._current_path = os.path.dirname(__file__) # 当前文件路径
self.start_year = int(_db_config['start_year']) # NVD开始年份
self.end_year = int(_db_config['end_year']) # NVD结束年份

def connect_db(self):
"""
连接数据库,返回conn

:return: 数据库连接对象
"""
conn = psycopg2.connect(
host=self.db_host,
port=self.db_port,
user=self.db_user,
password=self.db_password,
database=self.db_name
)
return conn

def fill_db(self):
cursor = self.conn.cursor()
print("正在清理过时的数据库......")
cursor.execute("drop table if exists nvd CASCADE")
cursor.execute("drop type if exists vector")
cursor.execute("drop type if exists level")
# 对于攻击向量的枚举类型: ('NETWORK', 'ADJACENT_NETWORK', 'LOCAL', 'PHYSICAL')
enum_vector_sql = "create type vector as enum ('NETWORK', 'ADJACENT_NETWORK', 'LOCAL', 'PHYSICAL')"
# 对于漏洞等级的枚举类型: ('CRITICAL', 'HIGH', 'MEDIUM', 'LOW')
enum_level_sql = "create type level as enum ('CRITICAL', 'HIGH', 'MEDIUM', 'LOW')"
# 创建nvd数据表的SQL语句
create_table_sql = f"create table {self.db_table} (cve_id varchar(20) not null, \
attack_vector vector not null, vuln_level level not null, base_score decimal not null, \
exploitability_socre decimal not null, impact_score decimal not null, primary key(cve_id))"
cursor.execute(enum_vector_sql) # 创建vector
cursor.execute(enum_level_sql) # 创建level
cursor.execute(create_table_sql) # 创建nvd数据表

# 向nvd数据表中插入数据
for year in range(self.start_year, self.end_year + 1):
with open(f'{self._current_path}/nvd_json/nvdcve-1.1-{year}.json', 'r') as f:
print(f"正在导入{year}年的CVE数据......", end="")
load_dict = json.load(f)
for cve in load_dict['CVE_Items']:
"""
因为json文件中有的项可能只是占位项,并没有真的漏洞,
因此首先要判断是不是占位项,如果是,跳过当前项。
判断的标准很简单,看"impact"字段是否为空。
"""
if cve['impact'] == {}:
continue
cve_id = cve['cve']['CVE_data_meta']['ID'] # CVE ID
"""
2015年之前的漏洞都只有CVSS2评分,因此只能使用CVSS2;
而2021年最新的几个漏洞只有CVSS3,没有CVSS2,因此只能使用CVSS3;
中间的年份既有CVSS2,又有CVSS3。因此,无法统一标准,只能妥协:
如果存在CVSS3,优先使用CVSS3;否则再使用CVSS2。
"""
if 'baseMetricV3' in cve['impact']: # CVSS3
attack_vector = cve['impact']['baseMetricV3']['cvssV3']['attackVector'] # 攻击向量
vuln_level = cve['impact']['baseMetricV3']['cvssV3']['baseSeverity'] # 漏洞等级
base_score = cve['impact']['baseMetricV3']['cvssV3']['baseScore'] # base score
exploitability_score = cve['impact']['baseMetricV3'][
'exploitabilityScore'] # exploitability score
impact_score = cve['impact']['baseMetricV3']['impactScore'] # impact score
else: # CVSS2
attack_vector = cve['impact']['baseMetricV2']['cvssV2']['accessVector']
vuln_level = cve['impact']['baseMetricV2']['severity']
base_score = cve['impact']['baseMetricV2']['cvssV2']['baseScore']
exploitability_score = cve['impact']['baseMetricV2']['exploitabilityScore']
impact_score = cve['impact']['baseMetricV2']['impactScore']
# 向数据库中插入数据的SQL语句
insert_sql = f"insert into {self.db_table} values ('{cve_id}','{attack_vector}'," \
f"'{vuln_level}',{base_score:.2f},{exploitability_score:.2f},{impact_score:.2f})"
cursor.execute(insert_sql) # 插入本条CVE数据
print("导入成功!")
self.conn.commit()
print(f"\n[SUCCESS]成功向数据库中导入{self.start_year}年到{self.end_year}年的所有漏洞数据!")

def download_nvd_json(self):
"""
从NVD下载指定年份区间内的json文件并解压
"""
os.system(f"rm -rf {self._current_path}/nvd_json/*")
for year in range(self.start_year, self.end_year + 1):
# 下载文件到 /db/nvd_json/目录下
os.system(
f"wget -P {self._current_path}/nvd_json/ \
https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-{year}.json.gz"
)
os.system(f"gunzip {self._current_path}/nvd_json/nvdcve-1.1-{year}.json.gz")

def close(self):
"""
关闭数据库连接
"""
self.conn.colse()


if __name__ == "__main__":
db_config = {'host': '数据库服务器IP', 'port': '数据库端口(默认5432)', 'user': 'nvd', 'password': '自定义密码', 'db_name': 'nvd', 'db_table': 'nvd', 'start_year': 2002, 'end_year': 2021}
db = DataBaseConnector(db_config)
# db.download_nvd_json()
db.fill_db()

最终完成了所有数据的插入,共有 165947 条 CVE 数据,完成这么多数据的解析和插入总共用时 46.79541 秒,应该是个还可以接受的时间吧,毕竟没有做什么优化。