话说学习编码最好的方式就是读优秀的代码和写代码。 尤其是读优秀的代码,既然自己写的代码很垃圾,多读读人家大牛们的优秀作品,受一下熏陶,沾惹点仙灵之气也好。俗话说:读书破万卷,下笔如有神!
,俗话还说:熟读唐诗三百首,不会作诗也会吟!
,那我把大牛们的代码读几遍,是不是也会写出牛光闪闪的代码呢?
又扯远了,言归正传,昨天给大家介绍了一下SteemData Notify
,觉得是个挺有意思的产品:
今天呢,就来学习一下它的代码,看看具体是如何实现相关功能的。
代码功能模块 代码托管在github上,地址是: https://github.com/SteemData/notify.steemdata.com
通过简单分析,可以知晓代码分为应用(网站)端和后端
应用(网站)端 src/app.py
后端 src/worker.py
应用端: 使用Python + Flask + MongoDB 后端则主要使用: Python + Steem官方Python库 + MongoDB
我们这节着重分析后端的代码
后端代码 源码在这里https://github.com/SteemData/notify.steemdata.com/blob/master/src/worker.py
通过阅读main函数 我们可以知道代码分成三大逻辑块
blockchain worker
confirmation worker
notifier worker
顾名思义,分别区块链工作进程、确认进程、通知进程, 分别通过不同的命令行参数启动。 至于为何不使用线程?以我实际经验,官方的Python 库对线程不是特别友好,尤其是一些复杂情况,可能导致很多意想不到的问题,所以多跑俩进程貌似也没啥不好的。或许作者有别的方面的思量,就不得而知了。
这篇文章我们主要学习三者之一的Blockchain Worker
Blockchain Worker 函数名: run_blockchain_worker()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 b = Blockchain() types = [ 'account_update', 'change_recovery_account', 'request_account_recovery', 'transfer', 'transfer_from_savings', 'set_withdraw_vesting_route', 'withdraw_vesting', 'fill_order', 'fill_convert_request', 'fill_transfer_from_savings', 'fill_vesting_withdraw', ]
b = Blockchain() 定义了Blockchain 实例 其本质呢,就是封装了一些对steem 节点的API操作 其中一个基本的操作是:stream(self, filter_by: Union[str, list] = list(), *args, **kwargs)
简单的解释就是把steem区块链上发生的操作yield成操作流,然后可以针对不同的操作类型进行处理,比如说投票机器人就是Yield出来文章流,然后投票。
types = [ xxxx] 这一大堆,就是上述函数中的filter_by参数,用来过滤抽取我们需要的相关操作,忽略掉无关内容
有了上述对Blockchain以及filter_by的讲解,下面的代码就很好理解了
1 2 3 4 5 6 7 8 9 10 11 12 try: block = db.last_processed_block.find_one() start_block = int(block['block_num']) - 1 except Exception: start_block = None for op in b.stream(filter_by=types, start_block=start_block): processed = db.processed_blockchains.find({'_id': op['_id']}).count() if not processed: if parse_blockchain(op): db.processed_blockchains.insert_one(op) db.last_processed_block.delete_many({}) db.last_processed_block.insert_one(op)
大致就是,把我们关心的操作读取回来,并且写到数据库中。
processed = db.processed_blockchains.find({'_id': op['_id']}).count()
判断我们是否已经处理过了,如果已经处理过,略过。
parse_blockchain(op)
这个判断是否是我们关心的数据,稍后详细讲
db.processed_blockchains.insert_one(op)
将op 插入我们处理过的op列表(mongodb)
db.last_processed_block.delete_many({})
db.last_processed_block.insert_one(op)
其实就是保存一下我们处理到哪里了,这样一旦因故障等问题中断,我们还可以接续上。
parse_blockchain(op) 前文我们说过,这个来获取我们关心的操作,这个是如何实现的呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 def parse_blockchain(op): settings = None message = None if op['type'] == 'account_update': settings = find_user_settings(op['account']) if settings and settings['account_update']: message = 'Received event: account_update (%s)' % op['account'] elif op['type'] in ['transfer', 'transfer_from_savings']: settings = find_user_settings(op['from']) if settings and settings[op['type']]: message = 'Received event: %s\nEvent detail: %s -> %s (%s)' % ( op['type'], op['from'], op['to'], op['amount'], ) .... if settings and message: db.notifications.insert_one({ 'username': settings['username'], 'email': settings['email'], 'telegram_channel_id': settings['telegram_channel_id'], 'message': message, 'email_sent': False, 'telegram_sent': False, 'created_at': datetime.utcnow(), }) return True
嗯,我好像发现了一个了不起的BUG,一会再说。 这里又出来一个函数
1 2 3 4 5 6 def find_user_settings(username): try: rows = db.settings.find({'username': username, 'confirmed': True}).sort('created_at', -1) return rows[0] except Exception: return dict()
这个函数的功能,就是按用户名,读取用户(已确认)的设置并返回。
1 2 3 4 if op['type'] == 'account_update': settings = find_user_settings(op['account']) if settings and settings['account_update']: message = 'Received event: account_update (%s)' % op['account']
这样,这段代码就好理解了,按白话文翻译就是 如果操作是更新账户, 那么我就去设置表读取这个用户的设置 如果存在设置表中有这个用户的设置 并且 这个用户设置了关心 ‘account_update’ 那么通知信息就是吧啦啦啦 是不是很好理解
因为操作只能是这些操作中的一种,所以要逐一判断一下,直到发现或者判断完毕没有发现为止。
1 2 3 4 5 6 7 8 9 10 11 12 if settings and message: db.notifications.insert_one({ 'username': settings['username'], 'email': settings['email'], 'telegram_channel_id': settings['telegram_channel_id'], 'message': message, 'email_sent': False, 'telegram_sent': False, 'created_at': datetime.utcnow(), }) return True
如果发现了需要处理的操作,将其插入notifications 表。
BUG,BUG 注意这个 return True, 注意缩进,OMG,少缩进了一个TAB有木有? 这样,原本只有我们处理到的op 才应该返回True, 结果统统返回True了。
你问我后果是啥?貌似没啥后果,浪费一些数据库空间和CPU运算能力而已。
总结
Blockchain Worker 将blockchain上和账户有关的动态抓取进来
判断是否是SteemData Notify 注册(并确认)用户相关的操作以及是否是用户关心的操作
如果是,写入数据库通知表notifications
咦,一总结好像也没啥内容呢,那我为啥写了这么多呢?晕,这啰嗦的毛病要改一改了。 初学者水平有限,如有谬误敬请指正,深表谢意啦。
感谢阅读 / Thank you for reading. 欢迎upvote、resteem以及 following me @oflyhigh 😎
This page is synchronized from the post: SteemData Notify 代码学习一: Blockchain Worker / Code Study of SteemData Notify: Part one