| import requests import pickle import argparse import logging import sys from typing import * import re import os
wechat_user_agent = 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116' \ ' Safari/537.36 QBCore/4.0.1326.400 QQBrowser/9.0.2524.400 Mozilla/5.0 (Windows NT 6.1; WOW64) ' \ 'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2875.116 Safari/537.36 NetType/WIFI ' \ 'MicroMessenger/ WindowsWechat(0x63010200)' root = logging.root global_session = requests.session() host_schemas = re.compile(r'^(https?://[^/]+)/.*$')
def configure_logging(verbose: bool = False) -> logging.Logger: class ScreenOutputFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> int: return record.levelno in (logging.DEBUG, logging.INFO) fmt_str = '[%(asctime)s] [%(levelname)s] [%(name)s] (%(filename)s:%(lineno)d) %(message)s' fmt = logging.Formatter(fmt_str)
stdout_handler = logging.StreamHandler(sys.stdout) if verbose: stdout_handler.setLevel(logging.DEBUG) else: stdout_handler.setLevel(logging.INFO) stdout_handler.setFormatter(fmt) stdout_handler.addFilter(ScreenOutputFilter()) stderr_handler = logging.StreamHandler(sys.stderr) stderr_handler.setLevel(logging.WARNING) stderr_handler.setFormatter(fmt) root.addHandler(stdout_handler) root.addHandler(stderr_handler) root.setLevel(logging.DEBUG) return root
def parse_args(args=None): parser = argparse.ArgumentParser() parser.add_argument('--mid', required=True, type=int, help='MID from MyProfile response') parser.add_argument('--save_chapter_file', type=str, default='chapter_id.pkl', help='A pickle file storing the finished chapter IDs to prevent redundant request') parser.add_argument('--deploy_webpage_path', type=str, default='/var/www/html/youth_study.html', help='Path for deploying the finished page, used for screenshotting') parser.add_argument('--verbose', action='store_true', default=False, help='Log verbosely') return parser.parse_args(args)
def generate_webpage(study_page: str, title: str, deploy_path: str): study_url_pattern = re.compile(r'^(https?://h5\.cyol\.com/special/daxuexi/[^/]+/)(?:m|index)\.html$') page_match = re.match(study_url_pattern, study_page) assert page_match is not None, 'Unrecognized page %s' % study_page url_prefix = page_match.group(1) finished_img_url = url_prefix + 'images/end.jpg' root.info('Testing end image: %s', finished_img_url) _http_get(finished_img_url, url_prefix + 'css/index.css') root.info('Test OK') data = f""" <!DOCTYPE html> <html> <head> <meta charset="utf-8" /> <meta http-equiv="Content-Type" content="text/html; charset=utf-8"> <title>{title}</title> </head> <body style="margin: 0;"> <div style="background-image: url({finished_img_url}); position: absolute; background-size: 100% 100%; width: 100%; height: 100%;"> </div> </body> </html>""" with open(deploy_path, 'w') as f: f.write(data) root.info('HTML file generated to %s', deploy_path)
def _get_origin_from_referer(referer: str) -> str: match = re.search(host_schemas, referer) if match is None: raise ValueError('%s does not match the URL scheme', referer) return match.group(1)
def _merge_dict(origin: Dict[str, Any], default: Dict[str, Any]) -> Dict[str, Any]: ret = origin.copy() for key in default: if key not in origin: ret[key] = default[key] return ret
def _http_ops(method, url, referer, params, headers, retries, timeout, ensure_http_ok, **kwargs): default_header = {'User-Agent': wechat_user_agent, 'Referer': referer, 'Origin': _get_origin_from_referer(referer)} get_header = _merge_dict(headers or {}, default_header) last_ex = None while retries > 0: try: resp = method(url, params=params, headers=get_header, timeout=timeout, **kwargs) if ensure_http_ok and not resp.ok: raise ValueError('HTTP Request failed with status code %d' % resp.status_code) return resp except Exception as ex: ex_type = type(ex) root.warning('HTTP Request "%s" failed: Exception %s.%s: %s', url, ex_type.__module__, ex_type.__name__, str(ex)) last_ex = ex retries -= 1 ex_type = type(last_ex) root.error('HTTP Request "%s" failed: Max retry exceeded. Last exception: %s.%s: %s', url, ex_type.__module__, ex_type.__name__, str(last_ex)) raise last_ex
def _http_get(url: str, referer: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, Any]] = None, retries: int = 5, timeout: float = 20, ensure_http_ok: bool = True) -> requests.Response: root.debug('HTTP GET: %s with Referer: %s', url, referer) return _http_ops(global_session.get, url, referer, params, headers, retries, timeout, ensure_http_ok)
def _http_post_url_encoded(url: str, referer: str, data: Optional[Dict[str, Any]] = None, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, Any]] = None, retries: int = 5, timeout: float = 20, ensure_http_ok: bool = True) -> requests.Response: root.debug('HTTP POST: %s with Referer: %s', url, referer) data = data or {} return _http_ops(global_session.post, url, referer, params, headers, retries, timeout, ensure_http_ok, data=data)
def do_study(mid: int, chapter_file: str) -> Tuple[str, str]: root.info('Do the fucking study') root.info('Current mid: %d', mid) resp = _http_get('https://tuanapi.12355.net/questionnaire/getYouthLearningUrl?mid=%d' % mid, 'https://tuan.12355.net/wechat/index.html') root.debug(resp.text) url = resp.json()['youthLearningUrl'] sign_pattern = re.compile(r'sign=([^&]*)') sign_match = re.search(sign_pattern, url) assert sign_match is not None, 'Failed to match sign from url %s' % url sign = sign_match.group(1) litemall_header = {'X-Litemall-Token': '', 'X-Litemall-IdentiFication': 'young'} resp = _http_post_url_encoded('https://youthstudy.12355.net/apih5/api/user/get', 'https://youtustudy.12355.net/h5/', headers=litemall_header, data={'sign': sign}) root.debug(resp.text) token = resp.json()['data']['entity']['token'] litemall_header['X-Litemall-Token'] = token resp = _http_get('https://youthstudy.12355.net/apih5/api/young/chapter/new', 'https://youtustudy.12355.net/h5/', headers=litemall_header) root.debug(resp.text) new_chapter = resp.json() if os.path.isfile(chapter_file): with open(chapter_file, 'rb') as f: chapters = pickle.load(f) else: chapters = dict() chapter_id = new_chapter['data']['entity']['id'] chapter_name = new_chapter['data']['entity']['name'] chapter_url = new_chapter['data']['entity']['url'] if chapter_id in chapters: chapter_info = chapters[chapter_id] root.info('Returned cached chapter, ID: %s, Name: %s, URL: %s', chapter_id, chapter_info[0], chapter_info[1]) return chapter_info chapters[chapter_id] = chapter_name, chapter_url resp = _http_post_url_encoded('https://youthstudy.12355.net/apih5/api/young/course/chapter/saveHistory', 'https://youthstudy.12355.net/h5/', data={'chapterId': chapter_id}, headers=litemall_header) root.debug(resp.text) with open(chapter_file, 'wb') as f: pickle.dump(chapters, f) return chapter_name, chapter_url
def main(): args = parse_args() configure_logging(args.verbose) try: finished_img = do_study(args.mid, args.save_chapter_file) generate_webpage(finished_img[1], finished_img[0], args.deploy_webpage_path) except Exception as ex: root.critical('Process exit with exception %s', str(ex), exc_info=ex, stack_info=True)
if __name__ == '__main__': main()