NSICollectionPlatform/NSICollectionPlatformServer/app.py

350 lines
9.6 KiB
Python
Raw Normal View History

2023-08-30 20:29:16 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
from flask import Flask
from flask import request, make_response
import userManager as User
import dnsResolve
import subdomainLookup
import emailGrabbing
import portDetection
import logOperation as Operation
app = Flask(__name__)
data_source = {
"host": "118.24.151.27",
"username": "admin",
"password": "Liu947752894!",
"database": "NSI"
}
logOperation = Operation.OperationLog(**data_source)
user_manager = User.UserManager(**data_source)
if __name__ == '__main__':
app.run(host="127.0.0.1", port=8080)
def result(code="000000", desc="SUCCESS", data=None):
res = {
"code": code,
"desc": desc,
"data": data
}
return res
@app.before_request
def request_handle():
"""
请求拦截器根据token统一判断是否可用
:return: 拦截结果
"""
print(request.url)
print(request.headers)
url = request.url.split("/admin")[1]
print(url)
url = url.split("?")[0]
print(url)
if url not in ["/login", "/register"]:
flag = False
if "token" in request.headers:
token = request.headers['token']
flag = user_manager.valid_token(token)
if not flag:
response = make_response(result(code="100000", desc="valid token error"))
response.status = 401
return response
def log_operation(request_info, desc="", data=None):
"""
记录操作日志记录并保存在数据库中
:param request_info: 请求命令
:param desc: 用户执行的操作
:param data: 操作执行数据内容
"""
token = ""
username = ""
if "token" in request_info.headers \
and request_info.headers["token"] is not None \
and 'null' != request_info.headers["token"]:
token = request_info.headers["token"]
else:
username = request_info.json["username"]
logOperation.log(token=token, desc=desc, data=data, username=username)
@app.route('/admin/login', methods=['POST'])
def login():
"""
用户登录
:return: 登录结果
"""
username = request.json['username']
password = request.json['password']
__res = user_manager.valid_login(username, password)
if __res is None or len(__res) == 0:
res = result(code="10000", desc="用户不存在")
else:
data = {
"token": User.create_token(username + password),
"role": __res['role']
}
res = result(data=data)
log_operation(request_info=request, desc="用户登录", data={
"用户名": username,
"操作执行结果": res
})
return res
@app.route('/admin/register', methods=['POST'])
def register():
"""
用户注册此时无法设置权限只能管理员对用户设置权限
:return:
"""
username = request.json['username']
password = request.json['password']
role = request.json['role']
__res = user_manager.user_register(username, password, role)
if __res:
res = result(data="用户注册成功")
else:
res = result(code="10000", desc="用户已经存在")
log_operation(request_info=request, desc="用户注册", data={
"用户名": username,
"操作执行结果": res
})
return res
@app.route('/admin/user/add', methods=['POST'])
def user_add():
"""
新增用户
:return:
"""
__res = user_manager.user_add(
request.json['username'],
request.json['password'],
request.json['role'],
)
if __res:
res = result(data="新增用户成功")
else:
res = result(code="10000", desc="新增用户失败")
log_operation(request_info=request, desc="管理员新增用户", data={
"用户名": request.json['username'],
"角色": request.json['role'],
"操作执行结果": res
})
return res
@app.route('/admin/user/delete', methods=['POST'])
def user_delete():
"""
根据id删除用户
:return:
"""
__id = request.json['id']
__res = user_manager.user_delete(__id)
if __res:
res = result(data="删除用户成功")
else:
res = result(code="10000", desc="删除用户失败")
log_operation(request_info=request, desc="删除用户", data={
"ID": __id,
"操作结果": res
})
return res
@app.route('/admin/user/edit', methods=['POST'])
def user_edit():
"""
编辑用户
:return:
"""
__res = user_manager.user_edit(
request.json['id'],
request.json['username'],
request.json['password'],
request.json['role'],
)
if __res:
res = result(data="编辑用户成功")
else:
res = result(code="10000", desc="编辑用户失败")
log_operation(request_info=request, desc="编辑用户", data={
"ID": request.json['id'],
"用户名": request.json['username'],
"角色": request.json['role'],
"操作执行结果": res
})
return res
@app.route('/admin/user/list', methods=['GET'])
def user_list():
"""
获取用户列表
:return:
"""
__res = user_manager.user_list()
if __res:
return result(data=__res)
else:
return result(code="10000", desc="获取用户列表失败")
@app.route('/admin/user/get', methods=['GET'])
def user_get():
"""
根据id获取用户信息
:return:
"""
__res = user_manager.user_get(request.values['id'])
if __res:
return result(data=__res)
else:
return result(code="10000", desc="获取用户信息失败")
@app.route('/admin/user/authority/change', methods=['POST'])
def user_authority_change():
"""
根据id修改用户权限
:return:
"""
__id = request.json['id']
__role = request.json['role']
__res = user_manager.user_authority_change(__id, __role)
if __res:
res = result(data=__res)
else:
res = result(code="10000", desc="获取用户信息失败")
log_operation(request_info=request, desc="修改用户权限", data={
"ID": __id,
"角色": __role,
"操作执行结果": res
})
return res
@app.route('/admin/dns/resolution', methods=['GET'])
def dns_resolution():
"""
DNS解析
:return:
"""
__domainType = request.values["domainType"]
__domain = request.values["domain"]
if __domainType is None or __domainType not in ["A", "MX", "NS", "CNAME"]:
res = result(code="100000", desc="无效类型")
else:
if "A" == __domainType:
__data = dnsResolve.resolution_a(__domain)
elif "MX" == __domainType:
__data = dnsResolve.resolution_mx(__domain)
elif "NS" == __domainType:
__data = dnsResolve.resolution_ns(__domain)
elif "CNAME" == __domainType:
__data = dnsResolve.resolution_cname(__domain)
else:
__data = []
res = result(data=__data)
log_operation(request_info=request, desc="DNS解析", data={
"解析类型": __domainType,
"解析域名": __domain,
"解析结果": res
})
return res
@app.route('/admin/subdomain/lookup', methods=['GET'])
def subdomain_lookup():
"""
子域名查询
:return:
"""
domain = request.values["domain"]
if domain is None or domain == "":
res = result(code="100000", desc="域名为空")
else:
sub_domain_list = subdomainLookup.sub_domain_lookup(domain=domain)
__data = []
for key in sub_domain_list:
__data.append({
"href": key,
"title": sub_domain_list[key]
})
res = result(data=__data)
log_operation(request_info=request, desc="子域名查询", data={
"解析域名": domain,
"解析结果": res
})
return res
@app.route('/admin/email/grabbing', methods=['GET'])
def email_grabbing():
"""
邮箱账号抓取
:return:
"""
keyword = request.values["keyword"]
email_suffix = request.values["email_suffix"]
email_count = int(request.values["email_count"])
if keyword is None or keyword == "":
res = result(code="100000", desc="搜索关键值不能为空")
elif email_suffix is None or email_suffix == "":
res = result(code="100000", desc="搜索邮箱后缀不能为空")
else:
email_grabbing_result = emailGrabbing.EmailAccountGrabbing(keyword=keyword, email_suffix=email_suffix,
email_count=email_count).grabbing()
res = result(data=email_grabbing_result)
log_operation(request_info=request, desc="邮箱账号抓取", data={
"查询关键字": keyword,
"指定邮箱后缀": email_suffix,
"邮箱账号抓取结果": res
})
return res
@app.route('/admin/port/detection', methods=['GET'])
def port_detection():
"""
端口检测
:return:
"""
domain = request.values["domain"]
port = request.values["port"]
if domain is None or domain == "":
res = result(code="100000", desc="域名/IP不能为空")
else:
port_detection_result = portDetection.detection(domain=domain, port=port)
res = result(data=port_detection_result)
log_operation(request_info=request, desc="端口检测", data={
"域名或IP": domain,
"端口号": port,
"执行结果": res
})
return res
@app.route('/admin/operation/log/get', methods=['GET'])
def operation_log():
"""
操作记录
:return:
"""
username = request.values["username"]
__operation_log_list = logOperation.list_log(username=username)
res = result(data=__operation_log_list)
return res