1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
| import requests import pickle import argparse import logging import sys from typing import * import re import os
wechat_user_agent = 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116' \ ' Safari/537.36 QBCore/4.0.1326.400 QQBrowser/9.0.2524.400 Mozilla/5.0 (Windows NT 6.1; WOW64) ' \ 'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2875.116 Safari/537.36 NetType/WIFI ' \ 'MicroMessenger/7.0.20.1781(0x6700143B) WindowsWechat(0x63010200)' root = logging.root global_session = requests.session() host_schemas = re.compile(r'^(https?://[^/]+)/.*$')
def configure_logging(verbose: bool = False) -> logging.Logger: class ScreenOutputFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> int: return record.levelno in (logging.DEBUG, logging.INFO) fmt_str = '[%(asctime)s] [%(levelname)s] [%(name)s] (%(filename)s:%(lineno)d) %(message)s' fmt = logging.Formatter(fmt_str)
stdout_handler = logging.StreamHandler(sys.stdout) if verbose: stdout_handler.setLevel(logging.DEBUG) else: stdout_handler.setLevel(logging.INFO) stdout_handler.setFormatter(fmt) stdout_handler.addFilter(ScreenOutputFilter()) stderr_handler = logging.StreamHandler(sys.stderr) stderr_handler.setLevel(logging.WARNING) stderr_handler.setFormatter(fmt) root.addHandler(stdout_handler) root.addHandler(stderr_handler) root.setLevel(logging.DEBUG) return root
def parse_args(args=None): parser = argparse.ArgumentParser() parser.add_argument('--mid', required=True, type=int, help='MID from MyProfile response') parser.add_argument('--save_chapter_file', type=str, default='chapter_id.pkl', help='A pickle file storing the finished chapter IDs to prevent redundant request') parser.add_argument('--deploy_webpage_path', type=str, default='/var/www/html/youth_study.html', help='Path for deploying the finished page, used for screenshotting') parser.add_argument('--verbose', action='store_true', default=False, help='Log verbosely') return parser.parse_args(args)
def generate_webpage(study_page: str, title: str, deploy_path: str): study_url_pattern = re.compile(r'^(https?://h5\.cyol\.com/special/daxuexi/[^/]+/)(?:m|index)\.html$') page_match = re.match(study_url_pattern, study_page) assert page_match is not None, 'Unrecognized page %s' % study_page url_prefix = page_match.group(1) finished_img_url = url_prefix + 'images/end.jpg' root.info('Testing end image: %s', finished_img_url) _http_get(finished_img_url, url_prefix + 'css/index.css') root.info('Test OK') data = f""" <!DOCTYPE html> <html> <head> <meta charset="utf-8" /> <meta http-equiv="Content-Type" content="text/html; charset=utf-8"> <title>{title}</title> </head> <body style="margin: 0;"> <div style="background-image: url({finished_img_url}); position: absolute; background-size: 100% 100%; width: 100%; height: 100%;"> </div> </body> </html>""" with open(deploy_path, 'w') as f: f.write(data) root.info('HTML file generated to %s', deploy_path)
def _get_origin_from_referer(referer: str) -> str: match = re.search(host_schemas, referer) if match is None: raise ValueError('%s does not match the URL scheme', referer) return match.group(1)
def _merge_dict(origin: Dict[str, Any], default: Dict[str, Any]) -> Dict[str, Any]: ret = origin.copy() for key in default: if key not in origin: ret[key] = default[key] return ret
def _http_ops(method, url, referer, params, headers, retries, timeout, ensure_http_ok, **kwargs): default_header = {'User-Agent': wechat_user_agent, 'Referer': referer, 'Origin': _get_origin_from_referer(referer)} get_header = _merge_dict(headers or {}, default_header) last_ex = None while retries > 0: try: resp = method(url, params=params, headers=get_header, timeout=timeout, **kwargs) if ensure_http_ok and not resp.ok: raise ValueError('HTTP Request failed with status code %d' % resp.status_code) return resp except Exception as ex: ex_type = type(ex) root.warning('HTTP Request "%s" failed: Exception %s.%s: %s', url, ex_type.__module__, ex_type.__name__, str(ex)) last_ex = ex retries -= 1 ex_type = type(last_ex) root.error('HTTP Request "%s" failed: Max retry exceeded. Last exception: %s.%s: %s', url, ex_type.__module__, ex_type.__name__, str(last_ex)) raise last_ex
def _http_get(url: str, referer: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, Any]] = None, retries: int = 5, timeout: float = 20, ensure_http_ok: bool = True) -> requests.Response: root.debug('HTTP GET: %s with Referer: %s', url, referer) return _http_ops(global_session.get, url, referer, params, headers, retries, timeout, ensure_http_ok)
def _http_post_url_encoded(url: str, referer: str, data: Optional[Dict[str, Any]] = None, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, Any]] = None, retries: int = 5, timeout: float = 20, ensure_http_ok: bool = True) -> requests.Response: root.debug('HTTP POST: %s with Referer: %s', url, referer) data = data or {} return _http_ops(global_session.post, url, referer, params, headers, retries, timeout, ensure_http_ok, data=data)
def do_study(mid: int, chapter_file: str) -> Tuple[str, str]: root.info('Do the fucking study') root.info('Current mid: %d', mid) resp = _http_get('https://tuanapi.12355.net/questionnaire/getYouthLearningUrl?mid=%d' % mid, 'https://tuan.12355.net/wechat/index.html') root.debug(resp.text) url = resp.json()['youthLearningUrl'] sign_pattern = re.compile(r'sign=([^&]*)') sign_match = re.search(sign_pattern, url) assert sign_match is not None, 'Failed to match sign from url %s' % url sign = sign_match.group(1) litemall_header = {'X-Litemall-Token': '', 'X-Litemall-IdentiFication': 'young'} resp = _http_post_url_encoded('https://youthstudy.12355.net/apih5/api/user/get', 'https://youtustudy.12355.net/h5/', headers=litemall_header, data={'sign': sign}) root.debug(resp.text) token = resp.json()['data']['entity']['token'] litemall_header['X-Litemall-Token'] = token resp = _http_get('https://youthstudy.12355.net/apih5/api/young/chapter/new', 'https://youtustudy.12355.net/h5/', headers=litemall_header) root.debug(resp.text) new_chapter = resp.json() if os.path.isfile(chapter_file): with open(chapter_file, 'rb') as f: chapters = pickle.load(f) else: chapters = dict() chapter_id = new_chapter['data']['entity']['id'] chapter_name = new_chapter['data']['entity']['name'] chapter_url = new_chapter['data']['entity']['url'] if chapter_id in chapters: chapter_info = chapters[chapter_id] root.info('Returned cached chapter, ID: %s, Name: %s, URL: %s', chapter_id, chapter_info[0], chapter_info[1]) return chapter_info chapters[chapter_id] = chapter_name, chapter_url resp = _http_post_url_encoded('https://youthstudy.12355.net/apih5/api/young/course/chapter/saveHistory', 'https://youthstudy.12355.net/h5/', data={'chapterId': chapter_id}, headers=litemall_header) root.debug(resp.text) with open(chapter_file, 'wb') as f: pickle.dump(chapters, f) return chapter_name, chapter_url
def main(): args = parse_args() configure_logging(args.verbose) try: finished_img = do_study(args.mid, args.save_chapter_file) generate_webpage(finished_img[1], finished_img[0], args.deploy_webpage_path) except Exception as ex: root.critical('Process exit with exception %s', str(ex), exc_info=ex, stack_info=True)
if __name__ == '__main__': main()
|