scripts/python/ipv4_aliyun.py
2023-08-31 14:42:33 +08:00

213 lines
8.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/08/23
@Author : Carol
@Site :
@File : ipv4.py
@Software: PyCharm
@Description:
- 版本
-- python==3.8
-- alibabacloud_alidns20150109==3.0.7
-- aliyun-python-sdk-domain==3.14.9
- 依赖安装
-- pip3 install alibabacloud_alidns20150109==3.0.7
-- pip3 install aliyun-python-sdk-domain==3.14.9
"""
import json
import requests
from alibabacloud_alidns20150109 import models as alidns_20150109_models
from alibabacloud_alidns20150109.client import Client as Alidns20150109Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class RemoteIp:
def __init__(self, url, username, passwd):
self.url = url
self.username = username
self.passwd = passwd
def __get_token(self):
_url = self.url + "/emby/Users/authenticatebyname?X-Emby-Client=Emby%20Web&X-Emby-Device-Name=Chrome%20macOS&X-Emby-Device-Id=fa6b7733-23fc-4cb7-bae3-d7f0536b485e&X-Emby-Client-Version=4.7.13.0&X-Emby-Language=zh-cn"
content = requests.post(_url, False, {"Username": self.username, "Pw": self.passwd}).text
return json.loads(content)["AccessToken"]
def get_remote_addr(self):
token = self.__get_token()
full_url = self.url + "/emby/System/Info?X-Emby-Token=" + token
content = requests.get(full_url, False).text
addr = list(json.loads(content)["RemoteAddresses"])
remote_add = str(addr[0]).split(":")[1].replace("/", "")
return remote_add
def request_params(request, value):
request.value = value
class AliyunOperation:
def __init__(self, access_key_id, access_secret):
"""
初始化
:param access_key_id: 阿里云AccessKey的ID
:param access_secret: 阿里云AccessKey的密码
"""
config = open_api_models.Config(
# 必填,您的 AccessKey ID,
access_key_id=access_key_id,
# 必填,您的 AccessKey Secret,
access_key_secret=access_secret
)
# Endpoint 请参考 https://api.aliyun.com/product/Alidns
config.endpoint = f'alidns.cn-hangzhou.aliyuncs.com'
self.client = Alidns20150109Client(config)
def describe_subdomain_records(self, subdomain, domain_name):
"""
子域名解析记录查询 https://api.aliyun.com/api/Alidns/2015-01-09/DescribeSubDomainRecords
这个函数会返回一大堆东西但是目前我们需要使用的就两个一个是解析记录的数量一个就是RecordIdRecordId修改域名解析时需要用到
:param domain_name:
:param subdomain:
:return:
"""
describe_sub_domain_records_request = alidns_20150109_models.DescribeSubDomainRecordsRequest(
sub_domain=subdomain,
domain_name=domain_name
)
runtime = util_models.RuntimeOptions()
try:
response = self.client.describe_sub_domain_records_with_options(describe_sub_domain_records_request,
runtime)
if 200 == response.status_code:
return response.body
except Exception as error:
# 如有需要,请打印 error
UtilClient.assert_as_string(error.message)
def add_record(self, domain_name, rr, domain_type, value):
"""
新增域名解析记录 https://api.aliyun.com/api/Alidns/2015-01-09/AddDomainRecord
如果域名解析记录不存在时,就需要新增域名解析记录,新增域名解析记录比较简单,直接填写参数即可
:param domain_name: 域名名称 baidu.com
:param rr: 主机记录。如果要解析@.exmaple.com主机记录要填写”@”,而不是空。
:param domain_type: 解析记录类型。https://help.aliyun.com/document_detail/29805.html?spm=api-workbench.api_explorer.0.0.6b1a134bcODqcI
:param value: 记录值。String
:return:
"""
add_domain_record_request = alidns_20150109_models.AddDomainRecordRequest(
domain_name=domain_name,
rr=rr,
type=domain_type,
value=value
)
runtime = util_models.RuntimeOptions()
try:
response = self.client.add_domain_record_with_options(add_domain_record_request, runtime)
if 200 == response.status_code:
return response.body
except Exception as error:
# 如有需要,请打印 error
UtilClient.assert_as_string(error.message)
def update_record(self, domain_type, value, rr, record_id):
"""
更新域名解析记录
如果域名解析记录已存在则不能使用新增而是更新域名解析记录。更新的时候需要用到RecordId这个一般查询的时候就会有返回直接使用即可
:param domain_type: 解析记录类型。https://help.aliyun.com/document_detail/29805.html?spm=api-workbench.api_explorer.0.0.6b1a134bcODqcI
:param value: 记录值。String
:param rr: 主机记录。如果要解析@.exmaple.com主机记录要填写”@”,而不是空。
:param record_id: 解析记录的ID
:return:
"""
update_domain_record_request = alidns_20150109_models.UpdateDomainRecordRequest(
record_id=record_id,
rr=rr,
type=domain_type,
value=value
)
runtime = util_models.RuntimeOptions()
try:
# 复制代码运行请自行打印 API 的返回值
self.client.update_domain_record_with_options(update_domain_record_request, runtime)
except Exception as error:
# 如有需要,请打印 error
UtilClient.assert_as_string(error.message)
def __execute():
# 获取动态 IPV4 地址如果是IPV6也可以用相同方式获取
__remote_addr = __get_from_urls()
if "None" == __remote_addr:
__remote_addr = RemoteIp(url=__url, username=__username, passwd=__passwd).get_remote_addr()
print("current ipv4: " + __remote_addr)
__aliyun_client = AliyunOperation(access_key_id=__accessKeyId, access_secret=__accessSecret)
# 查询 example.com 主机下 test.example.com 域名的解析列表
content = __aliyun_client.describe_subdomain_records("redirect.cnkj.site", "cnkj.site")
print("current desc domain content: " + str(content))
if 0 == content.total_count:
# 新增一条 test.example.com 的IPV4域名解析记录如果是IPV6地址修改参数 domain_type 为 AAAA
__aliyun_client.add_record("cnkj.site", "redirect", "A", __remote_addr)
elif 1 == content.total_count:
# 如果 test.example.com 域名解析已经存在,则进行更新操作
source = content.domain_records.record[0]
record_id = source.record_id
if __remote_addr != source.value:
__aliyun_client.update_record("A", __remote_addr, "redirect", record_id)
# else:
# # 不推荐使用该方式进行域名解析更新,容易错误更新
# sources = content.domain_records.record
# for source in sources:
# # 因为是更新IP所以只考虑 A 或者 AAAA
# domain_type = source.type
# if "A" == domain_type or "AAAA" == domain_type:
# record_id = source.record_id
# rr = source.rr
# __aliyun_client.update_record(domain_type, __remote_addr, rr, record_id)
def __get_from_urls():
"""
通过url获取本地IP地址
"https://myip4.ipip.net",
"https://ddns.oray.com/checkip",
"https://ip.3322.net",
"https://4.ipw.cn"
:return: ip
"""
ipw = "https://4.ipw.cn"
response = requests.get(ipw)
if 200 == response.status_code:
content = response.text
return content.strip()
ip = "https://ip.3322.net"
response = requests.get(ip)
if 200 == response.status_code:
content = response.text
return content.strip()
myip4 = "https://myip4.ipip.net"
response = requests.get(myip4)
if 200 == response.status_code:
content = response.text.split("")[1]
content = content.replace("来自于", "")
return content.strip()
ddns = "https://ddns.oray.com/checkip"
response = requests.get(ddns)
if 200 == response.status_code:
content = response.text.split(":")[1]
return content.strip()
return "None"
if __name__ == '__main__':
__url = "http://127.0.0.1:8096"
__username = "emby"
__passwd = "emby_passwd"
__accessKeyId = "access_key_id"
__accessSecret = "access_key_secret"
__execute()