蜜雪冰城兔单脚本
这个脚本的主要目的是通过多线程并发执行特定的HTTP请求来进行某种营销活动的操作。以下是对该脚本的解析:
导入模块
- requests:用于发送HTTP请求。
- threading:用于多线程操作。
- md5Encode:从`hashlib`模块导入,用于生成MD5哈希值。
固定变量区
- marketingId:营销活动的ID。
- tasks_num:总任务数,设置为100,表示需要执行100次任务。
- threads_num:最大线程数,设置为30,表示最多同时运行30个线程。
自定义变量区
- token:某种认证或授权的令牌(需要通过抓包获取)。
- round:活动的轮次时间,这里设置为“13:00”。
- secretword:活动的关键字或密码,这里设置为“年度重磅 新品免单”。
headers
HTTP请求的头信息,包括`Access-Token`、`Referer`、`Host`等。
函数解析
exchange
- 用于执行一次请求操作:
1. 构建URL和参数。
2. 生成MD5签名(用于验证)。
3. 构建请求体并发送POST请求。
4. 打印请求的结果。
threading_run
- 控制多线程的执行:
1. 循环创建线程,直到任务完成。
2. 确保线程数不超过`threads_num`。
start_task
- 启动任务函数,调用`threading_run`。
主程序
- `if __name__ == '__main__':` 是脚本的入口点。调用`start_task`来启动所有任务。
这个脚本是为了模拟多个客户端并发访问某个API接口的操作,可能是为了参加某种秒杀、抢购或其他类似的营销活动。通过多线程提高请求的并发量,以增加成功率。
import requests
import threading
from hashlib import md5 as md5Encode
# ----固定变量区----
marketingId = '1816854086004391938'
# 任务数和线程数
tasks_num = 100 # 运行 100 次
threads_num = 30 # 最大线程数 30 个
# ----自定义变量区----
token = '抓包token'
round = '13:00'
secretword = '年度重磅 新品免单'
headers = {
'Access-Token': token,
'Referer': 'https://mxsa-h5.mxbc.net/',
'Host': 'mxsa.mxbc.net',
'Origin': 'https://mxsa-h5.mxbc.net',
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.50(0x18003233) NetType/4G Language/zh_CN miniProgram/wx7696c66d2245d107',
'Content-type': 'application/json;charset=UTF-8',
}
def exchange():
try:
url = 'https://mxsa.mxbc.net/api/v1/h5/marketing/secretword/confirm'
param = f'marketingId={marketingId}&round={round}&s=2&secretword={secretword}c274bac6493544b89d9c4f9d8d542b84'
m = md5Encode(param.encode("utf8"))
sign = m.hexdigest()
body = {
"secretword": secretword,
"sign": sign,
"marketingId": marketingId,
"round": round,
"s": 2
}
res = requests.post(url, headers=headers, json=body)
print(f'任务开始: {res.text}')
except Exception as e:
print(f'任务失败: {e}')
def threading_run(tasks,threads):
for i in range(tasks):
if threading.active_count() t = threading.Thread(target=exchange)
t.start()
tasks -= 1
if threading.active_count() == threads + 1:
# 等待前面的进程结束后,再执行后面的代码,这里为1,因为程序本身即为一个线程
while threading.active_count() == threads + 1:
pass
while threading.active_count() != 1:
pass
def start_task():
threading_run(tasks_num, threads_num)
if __name__ == '__main__':
# 手动执行任务
start_task()