已解决
调用api接口获取token
来自网友在路上 175875提问 提问时间:2023-09-21 21:53:21阅读次数: 75
最佳答案 问答题库758位专家为你答疑解惑
import os
import pyodbc
import requests
import openpyxl
from openpyxl.utils import get_column_letter
import base64
import time
import hmac
import hashlib
import json
import urllib
import random
import pandas as pd
import datetime# 获取当前日期和时间
current_datetime = datetime.datetime.now()
# 将日期和时间格式化为字符串
formatted_datetime1= current_datetime.strftime("%Y-%m-%d")
# 生成新的文件名
new_filename = xxx_{}.xlsx".format(formatted_datetime1)class SecretRobot:def __init__(self, secret='', webhook=''):self.secret = secretself.webhook = webhookself.timestamp = str(round(time.time() * 1000))self.headers = {"Content-Type": "application/json"}def send(self, message):self.get_sign()webhook = self.webhook + '×tamp=' + self.timestamp + '&sign=' + self.signdata = {"msgtype": "text","text": {"content": message}}resp = requests.post(webhook, data=json.dumps(data), headers=self.headers)resp.close()def get_sign(self):secret_enc = self.secret.encode('utf-8')string_to_sign = '{}\n{}'.format(self.timestamp, self.secret)string_to_sign_enc = string_to_sign.encode('utf-8')hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()self.sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))def send_dingding_message1(message, secret):webhook = 'xxx'# 创建 SecretRobot 对象robot = SecretRobot(secret=secret, webhook=webhook)# 发送消息robot.send(message)def get_order_source_name(order_source_id):# 连接数据库conn = pyodbc.connect('DRIVER={SQL Server};SERVER=xxx;DATABASE=metabase;UID=xxx;PWD=xxx')cursor = conn.cursor()# 执行查询语句,获取order_source_namecursor.execute(f"SELECT xxx FROM xxx WHERE xxx = '{xxx}'")row = cursor.fetchone()# 关闭数据库连接conn.close()if row is not None:return row[0]else:return ""# 设置计数器和暂停时间
count = 0
pause_time = random.randint(10, 12)username = 'xxx'
password = 'xxx'# 编码用户名和密码为 Base64 字符串
credentials = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode("utf-8")
authorization = f"Basic {credentials}"# 连接数据库
conn = pyodbc.connect('DRIVER={SQL Server};SERVER=xxx;DATABASE=xxx;UID=xxx;PWD=xxx')
cursor = conn.cursor()url_template = 'xxx'# 执行查询语句,获取数据
cursor.execute("SELECT order_source_id, user_token_secret_access_key, order_source_country FROM rb_mercadolibre_site_user_info")
rows = cursor.fetchall()order_source_names = []processed_order_ids = [] # 存储已处理过的 order_source_id# 遍历每行数据
for row_idx, row in enumerate(rows, 2):order_source_id = row[0]if order_source_id in processed_order_ids:continueACCESS_TOKEN = row[1]order_source_country = row[2]processed_order_ids.append(order_source_id)order_source_id = int(order_source_id)# 构建接口入参payload = {"language": "zh_CN","input": {"customerId": 1,"orderSourceType": 70,"orderSourceId": order_source_id}}# 发送请求获取通tokenresponse = requests.post("xxx", json=payload, headers={"Authorization": authorization})if response.ok:json_data = response.json()data = json_data.get('data', {})ACCESS_TOKEN = data.get('userTokenSecretAccessKey', '')print(ACCESS_TOKEN)headers = {'Authorization': f'Bearer {ACCESS_TOKEN}'}# 计数器加一count += 1# 如果计数器达到了 5,则暂停 3 秒if count == 3:time.sleep(pause_time)count = 0#调用目标接口response_target = requests.get(url_template, headers=headers)if response_target.ok:# 处理返回结果response_data = response_target.json()print(response_data)# 判断是否满足条件if not response_data['status']['sell']['allow']:order_source_name = get_order_source_name(order_source_id) # 根据order_source_id获取对应的order_source_name# 将店铺名称添加到列表中order_source_names.append(order_source_name)else:print(f"由于token失效原因 failed {response_target.status_code}: {response_target.reason}")time.sleep(3)#调用目标接口response_target = requests.get(url_template, headers=headers)if response_target.ok:# 处理返回结果response_data = response_target.json()print(response_data)# 判断是否满足条件if not response_data['status']['sell']['allow']:order_source_name = get_order_source_name(order_source_id) # 根据order_source_id获取对应的order_source_name# 将店铺名称添加到列表中order_source_names.append(order_source_name)else:print(f"由于其它原因 failed {response.status_code}: {response.reason}")# 关闭数据库连接
conn.close()shop_count = len(order_source_names)msg1 = f"今天有{shop_count}家 KYC 店铺,分别是:\n"
msg1 += "\n".join([f" {order_source_name}" for order_source_name in order_source_names])secret1 = 'xxx'
send_dingding_message1(msg1,secret1)# 读取Excel文件
df = pd.read_excel(new_filename)# 获取"账号全程"列中与被KYC店铺列表相同的行索引
shop_rows = df[df['账号全称'].isin(order_source_names)].index# 修改对应行的"站点状态"列的值为"KYC"
df.loc[shop_rows, '站点状态'] = 'KYC'# 保存修改后的Excel文件
df.to_excel(new_filename, index=False)
- 导入所需的库,包括os、pyodbc、requests、openpyxl等。
- 定义一个名为SecretRobot的类,用于发送钉钉消息。其中包含了签名算法的实现。
- 定义一个函数send_dingding_message1,用于发送钉钉消息。该函数创建了一个SecretRobot对象,并调用其send方法发送消息。
- 定义一个函数get_order_source_name,用于从数据库中获取订单来源名称。
- 初始化计数器和暂停时间。
- 将数据库连接信息和授权信息存储在变量中。
- 连接数据库并执行查询语句,获取订单来源信息。
- 遍历获取的数据行,对每个订单来源进行验证操作。
- 根据订单来源信息构建接口入参,并发送请求获取令牌。
- 判断请求是否成功,如果成功则处理返回结果。
- 判断是否满足条件,如果不满足则获取订单来源名称并添加到列表中。
- 如果请求失败,则等待一段时间后重新发送请求。
- 遍历完所有订单来源信息后,关闭数据库连接。
- 统计满足条件的KYC店铺数量,并生成钉钉消息内容。
- 发送钉钉消息通知相关人员。
- 读取Excel文件。
- 获取与KYC店铺列表相匹配的行索引。
- 修改对应行的"站点状态"列的值为"KYC"。
- 保存修改后的Excel文件。
查看全文
99%的人还看了
相似问题
- alibaba店铺所有商品数据接口(alibaba.item_search_shop)
- shopee、亚马逊卖家如何安全给自己店铺测评?稳定测评环境是关键
- 1688店铺所有商品数据接口(1688.item_search_shop)
- 京东店铺所有商品数据接口(JD.item_search_shop)
- 关键词搜索1688商品数据接口(标题|主图|SKU|价格|优惠价|掌柜昵称|店铺链接|店铺所在地)
- 淘宝店铺所有商品数据接口及店铺商品数据分析
- 睿趣科技:抖音店铺怎么取名受欢迎
- 2023智慧云打印小程序源码多店铺开源版 +前端
- Redis入门 (店铺营业状态设置) --苍穹外卖day4
- 共享股东:让你的连锁店铺更有竞争力
猜你感兴趣
版权申明
本文"调用api接口获取token":http://eshow365.cn/6-10995-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!