-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathrss-get
executable file
·301 lines (263 loc) · 11.9 KB
/
rss-get
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
#!/usr/bin/env python3
import urllib.request as ul, urllib.error as ule, urllib.parse as ulp
import pathlib as pl, datetime as dt, subprocess as sp
import os, sys, socket, re, unicodedata, math, calendar
import feedparser as fp
p_err = lambda *a,**k: print('ERROR:', *a, file=sys.stderr, **k) or 1
str_norm = lambda v: unicodedata.normalize('NFKC', v).casefold()
class adict(dict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__dict__ = self
# Custom accept_header is for github specifically - returns html instead of feed if */* is in there
conf = adict( version='1.0', socket_timeout=20.0,
accept_header='application/atom+xml,application/rdf+xml,'
'application/rss+xml,application/x-netcdf,application/xml;q=0.9,text/xml;q=0.2',
feed_user_agent='rss-get/{ver} (github.com/mk-fg/fgtk#rss-get) feedparser/{ver_fp}' )
conf.feed_user_agent = conf.feed_user_agent.format(ver=conf.version, ver_fp=fp.__version__)
conf.aria2c_cmd = [ 'aria2c',
'-s8', '-x8', '-k', '2M', '--console-log-level=warn', '--file-allocation=none',
'--check-certificate=false', # workaround for aria2c or its tls lib, not important here
'--user-agent', conf.feed_user_agent ]
class FeedError(Exception): pass
def feed_url_check(url):
'Return if arg looks like an URL and not just local path.'
return re.search(r'^[\w\d]+:', str(url))
def feed_cache(url, path):
'Cache feed into path and return it or raise FeedError.'
path = pl.Path(path).resolve()
if not feed_url_check(url) and pl.Path(url).resolve() == path: return path
if path.exists(): return path
try:
req = ul.Request(url, headers={
'User-Agent': conf.feed_user_agent, 'Accept': conf.accept_header })
with ul.urlopen(req) as req: status, err, body = req.getcode(), req.reason, req.read()
except ule.URLError as err_ex: status, err = 1000, str(err_ex)
if status >= 300:
if body and len(body) < 250: err = repr(body.decode('utf-8', 'backslashreplace'))
raise FeedError(f'Failed to cache feed (status={status}): {url!r} - {err}')
path.write_bytes(body)
return path
def feed_parse(url):
'Return parsed feed with entries for url, raising FeedError on any issues.'
feed = fp.parse( url,
agent=conf.feed_user_agent, request_headers=dict(Accept=conf.accept_header) )
status, bozo, bozo_err = (
feed.get(k) for k in ['status', 'bozo', 'bozo_exception'] )
fetch_fail = (not status and bozo) or (status or 1000) >= 400
if not fetch_fail:
if not feed.entries:
fetch_fail = bozo = True
bozo_err = 'No feed entries'
elif not bozo and status is None and not feed_url_check(url): fetch_fail = False # file
if fetch_fail:
if bozo and not bozo_err: bozo_err = f'[no error msg (bozo={bozo})]'
raise FeedError(f'Failed to fetch feed (status={status}): {url!r} - {str(bozo_err).strip()}')
return feed
def feed_entry_enclosures(entry, ext_filter=None):
'Return filtered list of feed entry enclosure URLs in a consistent type-order, if any.'
enc_urls = list()
if ext_filter: ext_filter = str_norm(ext_filter.strip())
for enc in sorted(entry.enclosures or list(), key=lambda enc: enc.get('type') or '-'):
enc_href = enc.get('href')
if not enc_href: continue
if ext_filter:
if '/' in ext_filter:
enc_mime = enc.get('type')
if enc_mime and str_norm(enc_mime.split(';', 1)[0].strip()) != ext_filter: continue
else:
enc_ext = enc_href.rsplit('.', 1)[-1]
if ( len(ext_filter) < 5 and len(enc_ext) < 5
and ext_filter != str_norm(enc_ext) ): continue
enc_urls.append(enc_href)
return enc_urls
def feed_entry_ts(entry):
'Return tuple of (raw, parsed) timestamps for entry, latter one in datetime format.'
for k in 'published created modified'.split():
if not entry.get(k): continue
ts, ts_str = entry[f'{k}_parsed'], entry[k]
if ts: ts = dt.datetime.fromtimestamp(calendar.timegm(ts))
break
else: ts = ts_str = None
return ts_str, ts
_name_subs = {
r'[\\/]': '_', r'^\.+': '_', r'[\x00-\x1f]': '_', r':': '-_',
r'<': '(', r'>': ')', r'\*': '+', r'[|!"]': '-', r'[\?\*]': '_',
'[\'’]': '', r'\.+$': '_', r'\s+$': '', r'\s': '_', '#': 'N' }
def name_for_fs( name, fallback=...,
_name_subs=list((re.compile(k), v) for k,v in _name_subs.items()) ):
if not name and fallback is not ...: return fallback
for sub_re, sub in _name_subs: name = sub_re.sub(sub, name)
return name
def name_dedup(name, name_set, n=3):
suff_n, suff_re, suff_fmt = 1, re.compile(r'\.x' + r'\d'*n + r'$'), f'.x{{:0{n}d}}'
while name in name_set:
if m := suff_re.search(name): name = name.rsplit(m.group(0))[0]
name += suff_fmt.format(suff_n)
suff_n += 1
name_set.add(name)
return name
def ts_diff_str( ts, ts0=None, ext=None, sep='',
_units=dict( h=3600, m=60, s=1,
y=365.2422*86400, mo=30.5*86400, w=7*86400, d=1*86400 ) ):
delta = abs(
(ts - (ts0 or dt.datetime.now()))
if not isinstance(ts, dt.timedelta) else ts )
res, s = list(), delta.total_seconds()
for unit, unit_s in sorted(_units.items(), key=lambda v: v[1], reverse=True):
val = math.floor(s / float(unit_s))
if not val: continue
res.append('{:.0f}{}'.format(val, unit))
if len(res) >= 2: break
s -= val * unit_s
if not res: return 'now'
else:
if ext: res.append(ext)
return sep.join(res)
def main(args=None):
import argparse
parser = argparse.ArgumentParser(
description='Parse feed at the URL and print/download enclosure links from it.'
' Prints item info by default, unless -n/--num and/or -p/--print-urls opts are specified.'
' Downloads to temp dir by default, passing filelist to aria2c'
' with some parallelization options, then moves all files to the current dir.')
group = parser.add_argument_group('Feed fetch/load options')
group.add_argument('url', help='URL or file path to grab the feed from.')
group.add_argument('--file', action='store_true',
help='Do not try to append https:// prefix to non-URL and non-existent paths.')
group.add_argument('-o', '--out', metavar='file',
help='Cache fetched feed to specified file and parse from there.'
' If file already exists (cached), it will be used instead of fetching URL.')
group = parser.add_argument_group('File selection/download opts')
group.add_argument('-n', '--num', metavar='n[-m]', action='append',
help='Item(s) to print/download. Downloads them by default.'
' Can be either an integer item number (1 is latest), or N-M for first-last (inclusive).'
' Can be specified multiple times for multiple items/ranges.')
group.add_argument('-f', '--file-type', metavar='ext',
help='Enclosure file extension or content-type to match in case-insensitive manner.'
' Extension gets picked from last slug of the enclosure URLs.'
' Examples: audio/mp3, audio/mpeg, mp3, ogg')
group.add_argument('-t', '--ts-names', action='store_true',
help='Use timestamp-derived "YY-mm-dd.HHMM.N.ext" filename, if possible.')
group.add_argument('-v', '--headline-names', action='store_true',
help='Derive enclosure filename from item headline, if possible.'
' Can be used together with -t/--ts-names to get timestamp + headline filename.')
group.add_argument('-p', '--name-prefix', metavar='prefix',
help='Prefix to put at the start of every downloaded filename, auto-separated by a dot.')
group.add_argument('--max-filename-len', type=int, metavar='n', default=90,
help='Maximum length for filenames produced by -t and/or -v opts (in characters, not bytes).'
' Anything beyond that will be cut and replaced by "--" before dot-extension suffix.'
' Default: %(default)s')
group.add_argument('-i', '--inplace', action='store_true',
help='Download files in-place, instead of using tmpdir and then moving them all on success.')
group = parser.add_argument_group('Listing and output modes')
group.add_argument('-l', '--list', action='store_true',
help='List feed item titles only, one per line.')
group.add_argument('-m', '--list-meta', metavar='fields', default='{e.title}',
help='Only with --list - specify output format for each item/line.'
' Python str formatting is used, with "e" being feed entry object from feedparser module.'
' See https://pythonhosted.org/feedparser/ for info on available RSS/Atom Elements.'
' Default: %(default)s')
group.add_argument('-L', '--list-urls', action='store_true',
help='Print all or selected (by -n/--num) item URLs (one per line) instead of downloading.')
group.add_argument('-C', '--list-curl', action='store_true',
help='List curl commands for downloading all selected files individually and exit.')
group.add_argument('-A', '--list-aria2', action='store_true',
help='Print filelist for downloading all selected files with aria2c and exit.')
opts = parser.parse_args(sys.argv[1:] if args is None else args)
if opts.max_filename_len < 16:
parser.error( '-l/--max-filename-len must be at'
f' least 16 chars (parsed as {opts.max_filename_len})' )
ranges = list()
for v in opts.num or list():
m = re.search(r'^(\d+)(?:-(\d+))?$', v)
if not m: parser.error(f'Invalid format for -n/--num option, should be N[-M]: {v!r}')
a, b = int(m.group(1)) or 1, int(m.group(2) or 0)
if not b: b = a
elif b < a: a, b = b, a
ranges.append((a, b))
feed_url = opts.url
if ( not opts.file
and not feed_url_check(feed_url)
and not pl.Path(feed_url).exists() ):
feed_url = f'https://{feed_url}'
if conf.socket_timeout is not None:
socket.setdefaulttimeout(conf.socket_timeout)
try:
if opts.out: feed_url = feed_cache(feed_url, opts.out)
feed = feed_parse(feed_url)
except FeedError as err: return p_err(err)
entries = list(enumerate(feed.entries, 1))
ts0, n0, file_urls = None, entries[0][0], dict()
for n, e in reversed(entries):
enc_urls = feed_entry_enclosures(e, opts.file_type)
ts_str, ts = feed_entry_ts(e)
if opts.list or not ranges:
if not opts.list:
ts0, ts_diff = ts, f' +{ts_diff_str(ts - ts0)}' if ts0 else None
ts_diff_last = f' -{ts_diff_str(ts)}' if n == n0 else None
print(f'--- [{n: >2d}] {e.title}')
print(f' [{ts_str}]{ts_diff or ""}{ts_diff_last or ""}')
if url := e.get('link'): print(f' {url}')
for url in enc_urls: print(f' {url}')
print()
else: print(opts.list_meta.format(e=e))
continue
elif ranges:
for a, b in ranges:
if n < a: continue
if b:
if n > b: continue
elif n != a: continue
break # n in range
else: continue
if not enc_urls: continue # only care about enclosures here
url = enc_urls[0]
if opts.list_urls: print(url)
else: file_urls[url] = adict(title=e.title.strip(), ts=ts, n=n)
if not file_urls: return
file_map, name_set = dict(), set()
for url, meta in file_urls.items():
file_map[url] = None
name = list()
if opts.ts_names and meta.ts:
name.append(meta.ts.strftime(f'%y-%m-%d.%H%M.{1000-meta.n:03d}'))
if opts.headline_names and meta.title:
name.append(f'{name_for_fs(meta.title)}')
if not name: continue
name = '_-_'.join(name)
if len(name) > opts.max_filename_len:
name = name[:opts.max_filename_len] + '--'
name = name_dedup(name, name_set)
ext = pl.Path(ulp.urlparse(url).path).name.rsplit('.')[-1]
file_map[url] = f'{name}.{ext}'
if opts.list_curl:
for url, fn in file_map.items():
opts = '-fL'
if not fn: opts += 'OJ'
else: opts += f"o '{fn}'"
print(f" curl {opts} '{url}'")
return
file_list = list()
for url, fn in file_map.items():
file_list.append(f'{url}\n')
if fn: file_list.append(f' out={fn}\n')
file_list = ''.join(file_list)
if opts.list_aria2: return print(file_list.strip())
if opts.inplace:
return sp.run(
[*conf.aria2c_cmd, '--input-file', '-'],
input=file_list.encode(), check=True ) and None
import shutil, tempfile
p_dst_dir = pl.Path.cwd()
with tempfile.TemporaryDirectory(prefix=f'.rss-get.{os.getpid()}.') as p:
p_tmpdir = pl.Path(p)
sp.run(
[*conf.aria2c_cmd, '--input-file', '-'],
cwd=p_tmpdir, input=file_list.encode(), check=True )
for p in sorted(p_tmpdir.iterdir(), key=lambda p: p.name):
p_dst = p_dst_dir
if ( (pre := opts.name_prefix) and not
str_norm(fn := p.name).startswith(str_norm(pre)) ): p_dst /= f'{pre}.{fn}'
shutil.move(p, p_dst)
if __name__ == '__main__': sys.exit(main())