阿里云漏洞敏捷管理自动化报告
新钛云服已累计为您分享692篇技术干货
https://next.api.aliyun.com/api/Sas/2018-12-03/ModifyStartVulScan?params={}
更多接口详细信息,请看API文档,地址:https://next.api.aliyun.com/document/Sas/2018-12-03/ModifyStartVulScan
Python
pip install alibabacloud_sas20181203==1.1.29
链接地址为:https://help.aliyun.com/document_detail/116401.htm?spm=a2c4g.11186623.0.0.b22b48f5z7iKkL#task-2245479
Python
# -*- coding: utf-8 -*-
import sys
from typing import List
from alibabacloud_sas20181203.client import Client as Sas20181203Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_sas20181203 import models as sas_20181203_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
ak="" #请在此处填写您的 AccessKey ID
sk="" #请在此处填写您的 AccessKey Secret
class Sample:
def __init__(self):
pass
@staticmethod
def create_client(
access_key_id: str,
access_key_secret: str,
) -> Sas20181203Client:
config = open_api_models.Config(
#我对这两行代码进行了修改,采用变量的方式,无需重复填写AKSK
access_key_id=ak,
access_key_secret=sk
)
config.endpoint = f'tds.aliyuncs.com'
return Sas20181203Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
#我对这一行代码进行了修改,采用变量的方式,无需重复填写AKSK
client = Sample.create_client(ak,sk)
modify_start_vul_scan_request = sas_20181203_models.ModifyStartVulScanRequest(
types='emg' #这里以应急漏洞为例,所以只扫描应急漏洞
)
runtime = util_models.RuntimeOptions()
try:
client.modify_start_vul_scan_with_options(modify_start_vul_scan_request, runtime)
except Exception as error:
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])
Python
# -*- coding: utf-8 -*-
import sys
from typing import List
from alibabacloud_sas20181203.client import Client as Sas20181203Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_sas20181203 import models as sas_20181203_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
ak="" #请在此处填写您的 AccessKey ID
sk="" #请在此处填写您的 AccessKey Secret
class Sample:
def __init__(self):
pass
@staticmethod
def create_client(
access_key_id: str,
access_key_secret: str,
) -> Sas20181203Client:
config = open_api_models.Config(
#我对这两行代码进行了修改,采用变量的方式,无需重复填写AKSK
access_key_id=ak,
access_key_secret=sk
)
config.endpoint = f'tds.aliyuncs.com'
return Sas20181203Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
#我对这两行代码进行了修改,采用变量的方式,无需重复填写AKSK
client = Sample.create_client(ak, sk)
export_vul_request = sas_20181203_models.ExportVulRequest(
type='emg',
lang='zh',
necessity='asap',
dealed='n'
)
runtime = util_models.RuntimeOptions()
try:
# 复制代码运行请自行打印 API 的返回值
client.export_vul_with_options(export_vul_request, runtime) #提示:下面我们将对这一部分进行修改
except Exception as error:
# 如有需要,请打印 error
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])
Python
print(client.export_vul_with_options(export_vul_request, runtime))
Python
print(type(client.export_vul_with_options(export_vul_request, runtime)))
#<class 'alibabacloud_sas20181203.models.ExportVulResponse'>
Python
print(client.export_vul_with_options(export_vul_request, runtime).__dict__)
Python
print(type(client.export_vul_with_options(export_vul_request, runtime).body))
#<class 'alibabacloud_sas20181203.models.ExportVulResponseBody'>
Python
print(client.export_vul_with_options(export_vul_request, runtime).body.__dict__)
#{'file_name': 'emg_20220831', 'id': 119828, 'request_id': '79B83B4B-1CE6-5987-9BDF-C3AB995E94D4'}
Python
print(client.export_vul_with_options(export_vul_request, runtime).body.id)
#119828
Python
# -*- coding: utf-8 -*-
import sys
from typing import List
from alibabacloud_sas20181203.client import Client as Sas20181203Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_sas20181203 import models as sas_20181203_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
ak="" #请在此处填写您的 AccessKey ID
sk="" #请在此处填写您的 AccessKey Secret
class Sample:
def __init__(self):
pass
@staticmethod
def create_client(
access_key_id: str,
access_key_secret: str,
) -> Sas20181203Client:
config = open_api_models.Config(
#我对这两行代码进行了修改,采用变量的方式,无需重复填写AKSK
access_key_id=ak,
access_key_secret=sk
)
config.endpoint = f'tds.aliyuncs.com'
return Sas20181203Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
#我对这两行代码进行了修改,采用变量的方式,无需重复填写AKSK
client = Sample.create_client(ak, sk)
export_vul_request = sas_20181203_models.ExportVulRequest(
type='emg',
lang='zh',
necessity='asap',
dealed='n'
)
runtime = util_models.RuntimeOptions()
try:
# 我对这行代码进行了调整,获取“漏洞导出任务的ID”
print(client.export_vul_with_options(export_vul_request, runtime).body.id)
except Exception as error:
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])
Python
# -*- coding: utf-8 -*-
import sys
from typing import List
from alibabacloud_sas20181203.client import Client as Sas20181203Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_sas20181203 import models as sas_20181203_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
ak="" #请在此处填写您的 AccessKey ID
sk="" #请在此处填写您的 AccessKey Secret
class Sample:
def __init__(self):
pass
@staticmethod
def create_client(
access_key_id: str,
access_key_secret: str,
) -> Sas20181203Client:
config = open_api_models.Config(
#我对这两行代码进行了修改,采用变量的方式,无需重复填写AKSK
access_key_id=ak,
access_key_secret=sk
)
config.endpoint = f'tds.aliyuncs.com'
return Sas20181203Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
#我对这两行代码进行了修改,采用变量的方式,无需重复填写AKSK
client = Sample.create_client(ak, sk)
describe_vul_export_info_request = sas_20181203_models.DescribeVulExportInfoRequest(
export_id=119828 #此处填写第三步获取到的“漏洞导出任务的ID”
)
runtime = util_models.RuntimeOptions()
try:
# 我对这行代码进行了调整,获取“导出后Excel的下载链接”
print(client.describe_vul_export_info_with_options(describe_vul_export_info_request, runtime).body.link)
except Exception as error:
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])
Python
pip install wget
Python
import time,wget,ssl #导入所需要的包
ssl._create_default_https_context = ssl._create_unverified_context #关闭ssl认证
url="https://vul-export.oss-cn-shanghai.aliyuncs.com/export/emg_20220826_xxx.zip?Expires=xxx" #填写上面获取的“导出后Excel的下载链接”
now_time=time.strftime('%Y%m%d',time.localtime(time.time())) #定义当前时间的变量,获取当前的时间
zip_name="emg_{}.zip".format(now_time) #定义zip包名变量,以当前时间命名的zip包名
wget.download(url,out=zip_name) #使用wget下载,下载的zip包与当前代码在同级目录
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
因为访问的网站是https://,需要SSL认证,而直接用urllib会导致本地验证失败(具体原因未查明),所以使用ssl._create_unverified_context关闭认证,否则会报下面这个错:
urllib.error.URLError: <urlopen error [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:997)>
Python
pip install zipfile36
Python
import zipfile #导入所需模块
with zipfile.ZipFile("emg_20220826.zip") as zf: #解压zip包
zf.extractall()
6、xlsx转成csv格式
Python
pip install pandas
Python
import pandas as pd #导入所需模块
pd.read_excel("emg_20220826.xlsx", index_col=0).to_csv("emg_20220826.csv", encoding='utf-8') #xlsx转成csv格式
在执行上面代码的过程中,发现输出了如下WARN日志:
UserWarning: Workbook contains no default style, apply openpyxl's default
warn("Workbook contains no default style, apply openpyxl's default")
中文翻译:
用户警告:工作簿不包含默认样式,请应用openpyxl的默认样式
警告(“工作簿不包含默认样式,应用openpyxl的默认样式”)
故障分析:
这个excel文件没有设置默认的样式。一般这种没有默认样式的excel文档是由java程序生成的,不是像windows系统日常使用中通过右键点击创建的excel文档。
解决方法:
第一种方法,导入warnings模块,忽略警告级别的提示。(这里我用的是这一种方法)
import warnings
warnings.simplefilter("ignore")
第二种方法,用excel程序,打开保存一下这个文档,使该文档程序属性变成Microsoft EXCEL,同时应该也保存了默认的excel样式
Python
import pandas as pd
import warnings
warnings.simplefilter("ignore")
pd.read_excel("emg_20220826.xlsx", index_col=0).to_csv("emg_20220826.csv", encoding='utf-8')
Python
CREATE TABLE `webbugsec` (
`id` int(25) NOT NULL COMMENT '漏洞ID',
`vulnerability` varchar(100) NOT NULL COMMENT '漏洞名称',
`urgency_level` varchar(25) NOT NULL COMMENT '修复紧急度',
`affected_assets_id` varchar(25) DEFAULT NULL COMMENT '影响资产ID',
`affected_assets_ip_internet` varchar(50) DEFAULT NULL COMMENT '影响资产IP(公网)',
`affected_assets_ip_intranet` varchar(50) DEFAULT NULL COMMENT '影响资产IP(私网)',
`affected_assets_name` varchar(100) DEFAULT NULL COMMENT '影响资产备注名称',
`first_detect_time` varchar(50) NOT NULL COMMENT '首次发现时间',
`last_detect_time` varchar(50) NOT NULL COMMENT '最近一次发现时间',
`handle_time` varchar(50) DEFAULT NULL COMMENT '处理时间',
`note` varchar(1500) NOT NULL COMMENT '漏洞说明',
`status` varchar(25) NOT NULL COMMENT '漏洞状态',
`fix_command` varchar(100) DEFAULT NULL COMMENT '修复命令',
`cve_id` varchar(50) DEFAULT NULL COMMENT 'CVE编号',
`tags` varchar(50) DEFAULT NULL COMMENT '标签',
`suggestion` varchar(1500) NOT NULL COMMENT '修复建议'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
CREATE TABLE `code` (
`code` varchar(25) NOT NULL COMMENT 'code',
`affected_assets_name` varchar(50) NOT NULL COMMENT '影响资产备注名称',
`affected_assets_id` varchar(25) NOT NULL COMMENT '影响资产ID',
`affected_assets_ip_internet` varchar(50) DEFAULT NULL COMMENT '影响资产IP(公网)',
`affected_assets_ip_intranet` varchar(50) NOT NULL COMMENT '影响资产IP(私网)',
`project_description` varchar(100) DEFAULT NULL COMMENT '项目描述',
`bsm` varchar(100) NOT NULL COMMENT 'BSM'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
Python
with open("emg_20220826.csv", 'r', encoding='utf8') as f:
msg = f.read()
msg = msg.replace(
"漏洞ID,漏洞名称,修复紧急度,影响资产ID ,影响资产IP(公网),影响资产IP(私网),影响资产备注名称,首次发现时间,最近一次发现时间,处理时间,漏洞说明,漏洞状态,修复命令,CVE编号,标签,修复建议",
'id,vulnerability,urgency_level,affected_assets_id,affected_assets_ip_internet,affected_assets_ip_intranet,affected_assets_name,first_detect_time,last_detect_time,handle_time,note,status,fix_command,cve_id,tags,suggestion')
with open("emg_20220826.csv", 'w', encoding='utf8') as f:
f.write(msg)
Python
#导入所需模块
import pandas as pd #as表示给这个模块取别名,调用这个模块的时候直接用别名就可以了
from urllib import parse #表示使用ullib模块中的parse方法
from sqlalchemy import create_engine
#请填写数据库连接信息
user = "root"
password = "xxx"
host = "127.0.0.1"
db = "db_name"
pwd = parse.quote_plus(password) #是将一些特殊的字符串转换为固定的一些符号字母数字组合,若是密码中含有特殊字符,在连接数据库的时候也不会报错。
engine = create_engine(f"mysql+pymysql://{user}:{pwd}@{host}:3306/{db}?charset=utf8") #创建一个引擎对象,初始化数据库连接
connection = engine.connect() #创建一个连接对象
#清空webbugsec表中的所有数据
with engine.begin() as conn:
conn.execute("truncate table webbugsec")
#检查webbugsec表是否清空
print(pd.read_sql(sql="select * from webbugsec;", con=engine))
#读取xlsx文件
df=pd.read_csv("emg_20220826.csv")
#导入到mysql数据库中的webbugsec表中,如果存在则替换
df.to_sql(name='webbugsec', con=engine, index=False, if_exists='append')
#连表查询我们需要的数据,可按照自己的需求进行调整
db = pd.read_sql(sql="select w.affected_assets_name as '影响资产备注名称',c.code as 'CODE',c.project_description as '项目描述',c.bsm as 'BSM',w.affected_assets_id as '影响资产ID',w.affected_assets_ip_internet as '影响资产IP(公网)',w.affected_assets_ip_intranet as '影响资产IP(私网)',w.vulnerability as '漏洞名称', w.note as '漏洞说明',w.suggestion as '修复建议' from webbugsec w,code c where w.affected_assets_id=c.affected_assets_id ORDER BY c.code ASC;",con=engine)
#将查找到的内容导出到应急漏洞_20220826.xlsx表中
excel_o_file_name="./应急漏洞_20220826.xlsx"
excel_o_file = pd.ExcelWriter(excel_o_file_name)
db.to_excel(excel_o_file, index=False, encoding='utf-8', sheet_name='Sheet1')
excel_o_file.save()
#关闭数据库连接
connection.close()
8、通知
Python
import smtplib
from urllib import parse
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
def send_mail(MAIL_SERVER, MAIL_USERNAME, MAIL_PASSWORD, accept_list, Cc_list, SUBJECT, text, file_name):
message = MIMEMultipart()
message['From'] = MAIL_USERNAME # 发件人
message['To'] = ";".join(accept_list) # 收件人,将列表转换为字符串
message["Cc"] = ";".join(Cc_list) # 抄送人,将列表转换为字符串
message['Subject'] = SUBJECT # 邮件主题
message.attach(MIMEText(text, 'plain', 'utf-8')) # 格式化邮件内容,编码为utf-8
att1 = MIMEText(open(file_name, 'rb').read(), 'base64', 'utf-8') # 添加附件
att1["Content-Type"] = 'application/octet-stream' # 设置类型
#附件名称为中文时的写法
att1.add_header('Content-Disposition', 'attachment', filename = "应急漏洞_20220831.xlsx")
#附件名称非中文时的写法
#att1["Content-Disposition"] = 'attachment; filename="{0}"'.format(parse.quote_plus("应急漏洞_20220831.xlsx")) # 设置邮件用现实的名称
message.attach(att1)
try:
#使用25端口,不启用ssl
#mailServer = smtplib.SMTP(MAIL_SERVER, 25)
#使用465端口,启用ssl
mailServer = smtplib.SMTP_SSL(MAIL_SERVER, 465)
# 登录邮箱
mailServer.login(MAIL_USERNAME, MAIL_PASSWORD) # 需要的是,邮箱的地址和授权密码
# 发送文件
mailServer.sendmail(MAIL_USERNAME, accept_list + Cc_list, message.as_string())
mailServer.close() # 关闭连接
return True
except Exception as e:
print(e)
return False
def main():
MAIL_SERVER = '' # smtp服务器
MAIL_USERNAME = '' # 发件人
MAIL_PASSWORD = '' # 发送者授权码或密码
accept_list = ['', ] # 收件人,多个收件人用列表的方式填写
Cc_list = ['',] # 抄送人,多个抄送人用列表的方式填写
SUBJECT = "云安全中漏洞管理" # 主题
text = "云安全中心漏洞管理:\n应急漏洞:emg_20220729.xlsx文件大小[33kb]" # 内容
file_name = "./应急漏洞_20220831.xlsx" # 附件路径
print(send_mail(MAIL_SERVER, MAIL_USERNAME, MAIL_PASSWORD, accept_list, Cc_list, SUBJECT, text, file_name))
if __name__ == '__main__':
main()
Python
pip install DingtalkChatbot
Python
from dingtalkchatbot.chatbot import DingtalkChatbot
# 初始化机器人
xiaoding = DingtalkChatbot('这里填写WebHook地址')
# Text消息并@指定用户
xiaoding.send_text(msg="这里填写通知内容", at_mobiles=['这里填写要@的用户与钉钉绑定的手机号',])
Python
from dingtalkchatbot.chatbot import DingtalkChatbot
import os
# 初始化机器人
xiaoding = DingtalkChatbot('这里填写WebHook地址')
# 查看导出的表格大小
filesize = int(os.path.getsize("./应急漏洞_20220831.xlsx")/1024)
# 使用markdown格式发送钉钉通知,并@指定人员
xiaoding.send_markdown(title="云安全中心漏洞管理",text='### 云安全中心漏洞管理\n\n**应急漏洞:**emg_20220729.xlsx文件大小[{}kb]\n\n'.format(filesize), at_mobiles=['这里填写要@的用户与钉钉绑定的手机号', ])
最终钉钉通知的效果:
Python
[root@iZuf6d2ri21ufpdxZ ~]# crontab -l
#阿里云云安全中心漏洞任务
10 8 8 * * /usr/local/bin/python3 /usr/local/aliyun/ali-secemg.py
3、系统补丁更新
推荐阅读
了解新钛云服
微信扫码关注该文公众号作者
戳这里提交新闻线索和高质量文章给我们。
来源: qq
点击查看作者最近其他文章