-
Notifications
You must be signed in to change notification settings - Fork 0
/
basehandler.py
142 lines (115 loc) · 4.23 KB
/
basehandler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# --*-- coding: utf-8 --*--
"""
请求处理基类
"""
from utils import json_response
from aiohttp import ClientSession
from aiohttp.web import Response, StreamResponse
import ujson
import mimetypes
from datetime import datetime, timedelta, date
class BaseHandler():
async def json_post_request(self, http_session:ClientSession, url:str, data={}, cookie='')->dict:
"""
json post请求,并返回json对象
:param http_session:
:param url:
:param data:
:return:
"""
header = {}
header.update({"Content-Type": "application/json"})
if cookie:
header.update({"Cookie": cookie})
resp = await http_session.post(url, data=data, headers=header)
return await resp.json()
def reply_ok(self, response_data={}, extra={}):
data = {"code": 0, "message": "ok", "data": response_data}
return json_response(data=data)
async def replay_stream(self, byte_data,filename, request):
resp = StreamResponse()
resp.content_type = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
disposition = 'filename="{}"'.format(filename+'.xlsx')
if 'text' not in resp.content_type:
disposition = 'attachment; ' + disposition
resp.headers['CONTENT-DISPOSITION'] = disposition
resp.content_length = len(byte_data)
await resp.prepare(request)
await resp.write(byte_data)
await resp.write_eof()
return resp
def rounding(self, number):
"""
四舍五入
:param number:
:return:
"""
return "%.2f" % number
def percentage(self, number):
"""
转换成百分比
:param number:
:return:
"""
return "{0:0.2f}%".format(number*100)
def current_week(self):
"""
本周
:return:
"""
#list(self._get_week(datetime.now().date()))
return [d.isoformat() for d in self._get_week(datetime.now().date())]
def last_week(self):
"""
上周
:return:
"""
return [d.isoformat() for d in self._get_week((datetime.now()-timedelta(7)).date())]
def last_last_week(self):
"""
上上周
:return:
"""
return [d.isoformat() for d in self._get_week((datetime.now() - timedelta(14)).date())]
def last_week_from_7_to_6(self):
"""
上周 从星期日到星期六为一周期 日 一 二 三 四 五 六
:return:
"""
return [d.isoformat() for d in self._get_week_from_7_to_6((datetime.now()).date())]
def last_last_week_from_7_to_6(self):
"""
上上周 从星期日到星期六为一周 日 一 二 三 四 五 六
:return:
"""
return [d.isoformat() for d in self._get_week_from_7_to_6((datetime.now() -timedelta(7) ).date())]
def _get_week_from_7_to_6(self, date):
one_day = timedelta(days=1)
day_idx = (date.weekday()+1) % 7
sunday = date - timedelta(days=7+day_idx)
date = sunday
for n in range(1,8):
yield date
date += one_day
def _get_week(self, date):
one_day = timedelta(days=1)
day_idx = (date.weekday()) % 7
sunday = date - timedelta(days=day_idx)
date = sunday
for n in range(7):
yield date
date += one_day
def _curr_and_last_and_last_last_month(self):
"""
当月和上月分别开始和结束日期
:return:
"""
today = date.today()
first_day_of_curr_month = today.replace(day=1)
last_day_of_last_month = first_day_of_curr_month - timedelta(days=1)
first_day_of_last_month = last_day_of_last_month.replace(day=1)
last_day_of_last_last_month =first_day_of_last_month - timedelta(days=1)
first_day_of_last_last_month = last_day_of_last_last_month.replace(day=1)
return first_day_of_last_last_month.strftime("%Y-%m-%d"), last_day_of_last_last_month.strftime("%Y-%m-%d"), \
first_day_of_last_month.strftime("%Y-%m-%d"), last_day_of_last_month.strftime("%Y-%m-%d"), \
first_day_of_curr_month.strftime("%Y-%m-%d"), today.strftime("%Y-%m-%d")