20.multi_case07
20 multi
2023-09-11 14:21:24 时间
# coding:utf-8
import re
import ssl
import csv
import json
import time
import random
import asyncio
import aiohttp
import requests
from lxml import etree
from asyncio.queues import Queue
from aiosocksy import Socks5Auth
from aiosocksy.connector import ProxyConnector, ProxyClientRequest
class Common():
task_queue = Queue()
result_queue = Queue()
market_cap_all = 0
currency_rate = 0
# 线上内网
socks5_address_prod = [
'socks5://10.1.100.253:1235',
'socks5://10.1.100.51:1235',
'socks5://10.1.100.70:1235',
'socks5://10.1.100.205:1235',
'socks5://10.1.100.73:1235'
]
# 办公网
socks5_address_dev = [
'socks5://18.208.81.123:1235',
'socks5://34.197.217.25:1235',
'socks5://52.20.255.43:1235',
'socks5://34.237.163.87:1235',
'socks5://18.208.81.123:1235',
'socks5://52.0.114.155:1235'
]
DEPLOY_MODE = "dev"
async def session_get(session, url, socks):
auth = Socks5Auth(login='...', password='...')
headers = {'User-Agent': 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'}
timeout = aiohttp.ClientTimeout(total=20)
response = await session.get(
url,
proxy=socks,
proxy_auth=auth,
timeout=timeout,
headers=headers,
ssl=ssl.SSLContext()
)
return await response.text(), response.status
async def download(url):
connector = ProxyConnector()
if DEPLOY_MODE == "dev":
socks = None
elif DEPLOY_MODE == "Prod":
socks = random.choice(socks5_address_prod)
async with aiohttp.ClientSession(
connector=connector,
request_class=ProxyClientRequest
) as session:
ret, status = await session_get(session, url, socks)
if 'window.location.href' in ret and len(ret) < 1000:
url = ret.split("window.location.href='")[1].split("'")[0]
ret, status = await session_get(session, url, socks)
return ret, status
async def parse_html(cid, url, response):
coin_info = {}
coin_value = {}
coin_info['url'] = url
coin_info['cid'] = cid
coin_info['time'] = int(time.time())
tree = etree.HTML(response)
try:
price_usd = tree.xpath(
'//div[@class="priceInfo"]/div[@class="sub"]/span[1]/text()'
)[0].strip().replace('$', '')
if '?' not in price_usd:
coin_value['price'] = float(price_usd)
except BaseException:
pass
try:
updown = tree.xpath(
'//div[@class="priceInfo"]/div[@class="sub smallfont"]/span[1]/text()'
)[0].strip().replace('%', '')
coin_value['updown'] = float(updown)
except BaseException:
pass
try:
volume_24h_rmb = tree.xpath(
'//div[@class="info"]/div[@class="charCell"][2]/div[2]/span/text()'
)[0].strip().replace('¥', '').replace(',', '')
coin_value['volume_24h'] = int(
float(volume_24h_rmb) / Common.currency_rate)
except BaseException:
pass
try:
circulating_supply = tree.xpath(
'//div[@class="info"]//div[@class="charCell"][1]/div[@class="val"]/text()'
)[0].strip().replace(',', '')
if '?' not in circulating_supply:
circulating_supply = re.match(
r'^(\d+)(\w+)$', circulating_supply).group(1)
coin_value['circulating_supply'] = int(circulating_supply)
except BaseException:
pass
try:
if coin_value['price'] and coin_value['circulating_supply']:
market_cap = coin_value['price'] * coin_value['circulating_supply']
coin_value['market_cap'] = market_cap
except BaseException:
pass
try:
if coin_value['market_cap']:
global_share = coin_value['market_cap'] / Common.market_cap_all
if global_share < 0.001:
coin_value['global_share'] = '<0.1%'
else:
coin_value['global_share'] = str(
(global_share * 100).__round__(2)) + '%'
except BaseException:
pass
try:
circulation_rate = tree.xpath(
'//div[@class="info"]//div[@class="charbox"][1]/div[@class="val"]/text()'
)[0].strip()
if '?' not in circulation_rate:
coin_value['circulation_rate'] = circulation_rate
except BaseException:
pass
try:
turnover_rate = tree.xpath(
'//div[@class="info"]//div[@class="charbox"][1]/div[@class="val"]/text()'
)[1].strip()
if '?' not in turnover_rate:
coin_value['turnover_rate'] = turnover_rate
except BaseException:
pass
try:
issue_time = tree.xpath(
'//div[@class="infoList"]/div[1]/div[1]/span[2]/text()'
)[0].strip()
if issue_time != '-':
coin_value['issue_time'] = issue_time
except BaseException:
pass
try:
exchange_num = tree.xpath(
'//div[@class="infoList"]/div[3]/div[1]/span[2]/text()'
)[0].strip().replace('家', '')
coin_value['exchange_num'] = int(exchange_num)
except BaseException:
pass
try:
total_circulation = tree.xpath(
'//div[@class="infoList"]/div[2]/div[2]/span[2]/text()'
)[0].strip().replace(',', '')
coin_value['total_circulation'] = int(total_circulation)
except BaseException:
pass
coin_info['value'] = coin_value
return coin_info
async def down_and_parse_task(queue):
while True:
try:
cid, url = queue.get_nowait()[:2]
except BaseException:
return
for retry_cnt in range(3):
try:
html, status = await download(url)
if status != 200:
html, status = await download(url)
if '访问控制拒绝了你的请求' in html:
html, status = await download(url)
html_parse_result = await parse_html(cid, url, html)
print(html_parse_result)
await Common.result_queue.put(html_parse_result)
break
except BaseException:
await asyncio.sleep(0.2)
continue
async def push(data):
url = 'http://127.0.0.1:8000/aaa'
error = None
for retry_cnt in range(3):
try:
async with aiohttp.ClientSession() as session:
async with session.post(
url,
data=json.dumps(data)
) as response:
pass
response.raise_for_status()
except Exception as e:
await asyncio.sleep(0.2)
print(e)
async def speed_monitor():
while Common.task_queue.qsize() != 0:
old_queue_len = Common.task_queue.qsize()
await asyncio.sleep(5)
new_queue_count = Common.task_queue.qsize()
print('=================')
print('speed = ', (old_queue_len - new_queue_count) / 5)
async def monitor_finish():
while len(asyncio.Task.all_tasks()) > 3:
await asyncio.sleep(1)
await asyncio.sleep(5)
raise SystemExit()
async def push_results():
temp_q = []
while True:
try:
await asyncio.sleep(3)
for _ in range(Common.result_queue.qsize()):
temp_q.append(await Common.result_queue.get())
if len(temp_q) > 0:
await push(temp_q)
temp_q.clear()
except BaseException:
import traceback
print(traceback.format_exc())
async def get_marketcap():
url = 'https://dncapi.feixiaohao.com/api/home/global?webp=0'
response = requests.get(url)
response_json = json.loads(response.text)
marketcap = response_json['data']['marketcapvol']
Common.market_cap_all = int(marketcap)
async def get_currency_rate():
url_rate = 'https://dncapi.feixiaohao.com/api/coin/web-rate/'
response = requests.get(url_rate)
currency_rate = json.loads(response.text)[11]['cny']
Common.currency_rate = currency_rate
# 300秒抓取时间上限
async def time_limit():
await asyncio.sleep(280)
raise SystemExit()
async def main():
# loop = asyncio.get_event_loop()
csv_reader = csv.reader(
open(
'feixiaohao_mapping_data.csv',
encoding='utf-8'))
for row in csv_reader:
try:
if row[1].startswith('https'):
await Common.task_queue.put(row)
except BaseException:
pass
print(Common.task_queue)
await get_marketcap()
print('总市值', Common.market_cap_all)
await get_currency_rate()
print('汇率', Common.currency_rate)
for _ in range(10):
loop.create_task(down_and_parse_task(Common.task_queue))
loop.create_task(monitor_finish())
loop.create_task(speed_monitor())
loop.create_task(push_results())
loop.create_task(time_limit())
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
相关文章
- redhat下安装mysql 5.6.20,解压zip包,查看已经安装过的mysql,卸载rpm安装包,安装mysqlserver端和client,改动mysqlusername,登陆mysql,启动关闭mysql
- 关于软测面试的20个终极问题,春招软测人快来看..
- 2021-2022学年学英语报七年级第20期答案外研版基础版
- 《惢客创业日记》2019.06.20(周四)启用团队协作管理系统
- 数仓工具—Hive进阶之Statistics与Analyze Table命令(20)
- 【PAT乙级】1089 狼人杀-简单版 (20 分)
- FL studio 20简易入门教程 -- 第六篇 -- 调音台和自动化包络线
- 漏洞复现----20、Apache Solr RemoteStreaming 任意文件读取和SSRF
- 真希望我20几岁就知道的事-一书给20几岁人的建议
- 为数据科学和机器学习选择合适的笔记本电脑,完成数据科学和机器学习任务的完美笔记本电脑的 20 个必要条件
- 【转载】 NCCL(Nvidia Collective multi-GPU Communication Library) Nvidia英伟达的Multi-GPU多卡通信框架NCCL 学习;PCIe 速率调研
- 最好用的20个数据可视化工具(一)
- 1084 外观数列 (20 分)C语言
- 20.multi_协程方法抓取总阅读量
- 20.multi_case05
- 20.multi_case01