Garland +

用装饰器给函数加缓存

一个在外面流通的业务对象会被很多地方调用,比如一个商品,会出现在首页,店铺页,活动页,各种列表页等等, 每次查询都是都是通过商品 service 的查询方法 get_goods_by_id 来查询数据库得到 orm 对象,然后序列化, 返回给更上层的调用方,不加缓存的话一个对象会在多个地方被查询,这样就给数据库很大压力,那么缓存加在哪儿?

我们目前的缓存分为两种,简单讲的话就是一种能手动清,另一种不能手动清,能手动清理的目前只用在缓存 orm 对象实例 上,也就是数据库的一条记录以及实例上的方法,这边的 key 格式都固定(虽然是手写 23333),另一种不能手动清理的就是 方法根据函数和参数自己生成一个 key,只能依赖业务本身的性质去设置一个超时时间,这种一般用在缓存 API 接口的数据 或者接口的某个字段,或者某些返回 python 基本类型的方法。

为什么不缓存 orm 对象实例,因为代码服务化以后不希望 orm 上的操作方法暴露在 service 外面;为什么不用第二种根据方法和参数 自动生成 key 的缓存,因为这种流通的业务对象的性质决定了它的缓存是需要被清理的,比如商品的信息是随时会变的,而且根据 方法和参数自动生成的话,函数复杂的话就是几层的笛卡尔乘积,就根本没法清了。所以缓存不是万能的,需要限定下使用场景。

目前的问题是想缓存流通的业务对象,这种对象的查询在 service 层接口一般的方法是 batch_get_xxx_by_id,这里用 batch 是因为平时会遇到大量的批量查询需求,并且批量查询比给一堆 id 然后单个查询要快一个数量级(如果是主键的话), 入参是一个 id数组,返回值是和入参数组里 id 一一对应的一个 dict 类型的,值为 python 基本类型的以及我们定义 的 dto(data transfer object),也就是如下的方式

  1. def _build_dto_from_dao(dao):
  2. # do something
  3. return dto
  4. def batch_get_goods_by_id(goods_ids):
  5. if not goods_ids:
  6. return {}
  7. results = {goods_id: None for goods_id in goods_ids}
  8. daos = query_goods_daos_by_ids(goods_ids)
  9. for dao in daos:
  10. if not dao:
  11. continue
  12. results[dao.id] = _build_dto_from_dao(dao)
  13. return results

所以这个缓存要加在和上面类似的这种函数上,正确的讲应该是这次讨论的这个缓存函数。

现在讲一下期望的缓存函数的设计,被缓存的函数样子上面已经讲了,所以缓存函数应该是这么个样子

  1. @cache(expire=ONE_DAY)
  2. def batch_get_goods_by_id(goods_ids):
  3. pass

清缓存的地方就多了,更新数据的地方有很多,每个方法都不太一样,这里主要分为两种,也就是函数参数里面传不传对象 ID 的, 传了对象ID的,可以把参数名传进清缓存的函数里自动清理,好处是函数内部没有混入清缓存的逻辑,没传对象ID,然后因为某些 逻辑需要在函数内部更新对象数据的,就需要手动清理了,和查询不一样,大部分情况下更新数据底层逻辑都是单个更新,一般没有 批量更新这种说法,所以要在函数逻辑内部加入清缓存的逻辑,需要传入对象ID,也就是如下的方式

  1. @cache(expire=ONE_DAY)
  2. def batch_get_goods_by_id(goods_ids):
  3. pass
  4. @auto_clear(batch_get_goods_by_id, 'goods_id')
  5. def update_goods_name(goods_id, new_goods_name):
  6. pass
  7. def update_somethind(*args, **kwargs):
  8. # do something
  9. goods_dao = query_goods_dao_by_id(obj.goods_id)
  10. goods_dao.update_something
  11. # 清理缓存
  12. manual_clear(batch_get_goods_by_id, goods_dao.id)

要用的几个方法名已经在上面讨论过了,然后存储用的是 Memcache,所以要在初始化的时候传一个 mc_client 进去, 然后加一个生成 key 的方法用来开发调试,所以具体的代码结构大概如下:

  1. class SrvBatchCache(object):
  2. def __init__(self, mc):
  3. self.mc = mc
  4. def cache(self):
  5. pass
  6. def auto_clear(self):
  7. pass
  8. def manual_clear(self):
  9. pass
  10. def cache_key(self):
  11. pass

先讲下缓存的 key 的实现逻辑,因为 cache 函数和 clear 函数都依赖这个。期望的调用方式是传入待缓存的函数和对应的 业务对象ID,返回的是一个 cache_key 字符串,也就是如下的调用方式:

  1. def cache_key(self, cache_func, param_value):
  2. key_pattern = self._cache_key_pattern(cache_func)
  3. return key_pattern.format(param_value)

生成 key_pattern 的逻辑是分开的,所以要单独写个方法,现在看一下这个 key_pattern 怎么生成。key 要保证全局唯一, 只用函数名做 key 是不行的,不能完全保证不会有重名的方法,所以需要加上函数的 path;然后是被缓存的函数的唯一的参数名和对应 的业务ID;然后被缓存的函数比如重写了的话,返回值也乱了,所以还得塞一个 version 进去。所以最终是形如这样的 key

  1. "/home/garland/src/current/broker/sys/goods_sys/api/query.pyc:batch_get_goods:goods_ids=100000001:version=0"

函数的相对路径可以用 inspect.getmodule 方法拿到,全路径可以用 os 库拿到,version 和参数名,是函数初始化的时候写入函数信息里面的, 所以 _cache_key_pattern 实现如下:

  1. def _cache_key_pattern(self, f):
  2. module_path = inspect.getmodule(f).__file__
  3. full_path = os.path.abspath(module_path)
  4. version = f._cache_info.get('_version', 0)
  5. param_name = f._cache_info.get('_param_name', '')
  6. return '%s:%s:%s={}:version=%s' % (full_path, f.__name__, param_name, version)

对 cache 函数的要求是在初始化的时候将一些基本信息如 version 和参数名打进被缓存的函数内,取缓存的逻辑放在 cache 函数以下。因为是带参数的装饰器,所以函数基本 的样子如下

  1. from functools import wraps
  2. def cache(self, version=0, expire=86400):
  3. def deco(f):
  4. # balabala
  5. @wraps(f)
  6. def _(*args, **kwargs):
  7. # balabala
  8. return something
  9. return _
  10. return deco

因为限定了使用场景,所以要在应用初始化的时候做一些基本的校验,校验以下被装饰的函数参数信息和 cache 函数的参数,这里要 用到 inspect 这个库里面的 getargspec 方法,方法的注释如下:

  1. In [2]: inspect.getargspec?
  2. Signature: inspect.getargspec(func)
  3. Docstring:
  4. Get the names and default values of a function's arguments.
  5. A tuple of four things is returned: (args, varargs, varkw, defaults).
  6. 'args' is a list of the argument names (it may contain nested lists).
  7. 'varargs' and 'varkw' are the names of the * and ** arguments or None.
  8. 'defaults' is an n-tuple of the default values of the last n arguments.
  9. File: ~/miniconda2/envs/xcf/lib/python2.7/inspect.py
  10. Type: function

传入被装饰的函数,返回的 tuple 的第一个元素是参数列表,在这里校验下参数数量,然后把参数名信息打到函数上。在函数被调用的时候, 考虑到有时候线上出故障紧急修复,函数可以传入 force=True 参数,来强制刷新缓存,其他的就是缓存更新的策略,这里详细讲的话又是 另一篇文章了,可以参考 缓存更新的套路 这篇文章,具体到我们这里是用的就是 Cache Aside Pattern,即:

缓存更新的策略是个通用的策略,和这次写的这个针对特定函数的装饰器逻辑是分开的,所以代码可以分层,也就是如下的形式:

  1. class BatchCache(object):
  2. def __init__(self):
  3. pass
  4. def batch_force_run(self):
  5. pass
  6. def batch_run(self):
  7. pass

所以具体的 cache 函数的实现如下:

  1. def cache(self, version=0, expire=86400):
  2. '''缓存 batch_get_* 的返回值,有且只有一个 iterable 类型的参数'''
  3. def deco(f):
  4. # 基本校验
  5. arg_names, _, _, _ = inspect.getargspec(f)
  6. if len(arg_names) != 1:
  7. raise ValueError('only support one argument')
  8. if not isinstance(version, int):
  9. raise ValueError('version must be int type')
  10. # 相关信息打入要缓存的函数内
  11. param_name = arg_names[0]
  12. f._cache_info = {'_version': version, '_param_name': param_name}
  13. @wraps(f)
  14. def _(*args, **kwargs):
  15. force = kwargs.pop('force', False)
  16. # 获取参数值
  17. param_value = kwargs.get(param_name) or args[0]
  18. # 批量组装对象 id 和对应的 key
  19. keys_map = {oid: self.cache_key(f, oid) for oid in param_value}
  20. cache = BatchMutexCache(f, expire, keys_map, mc=self.mc)
  21. if force:
  22. # 强制刷新缓存
  23. return cache.batch_force_run(param_value)
  24. return cache.batch_run(param_value)
  25. return _
  26. return deco

auto_clear 函数的实现

auto_clear 是在那些需要更新缓存的传了对象ID的方法里,把对应对象ID的参数名传到装饰器内生成 key 自动清理,这里比较麻烦的 地方是获取指定参数的 param_name 值,还是要用到 inspect.getargspec 这个方法, _get_param_value 方法实现如下:

  1. def _get_param_value(self, f, param_name, *args, **kwargs):
  2. '''获取指定的参数 param_name 值'''
  3. arg_names, _, _, defaults = inspect.getargspec(f)
  4. if param_name not in arg_names:
  5. raise ValueError('{} not in {}'.format(param_name, arg_names))
  6. # 组装默认值参数
  7. default_args = dict(zip(arg_names[-len(defaults):], defaults)) if defaults else {}
  8. res = default_args.copy()
  9. res.update(zip(arg_names, args))
  10. res.update(kwargs)
  11. return res.get(param_name)

更新缓存的逻辑是,先写数据库,成功后,再让缓存失效,所以 auto_clear 的实现如下

  1. def auto_clear(self, cache_func, param_name):
  2. '''自动清理 cache_func 的缓存'''
  3. def deco(f):
  4. @wraps(f)
  5. def _(*args, **kwargs):
  6. param_value = self._get_param_value(f, param_name, *args, **kwargs)
  7. key = self.cache_key(cache_func, param_value)
  8. res = f(*args, **kwargs)
  9. self.mc.delete(key)
  10. return res
  11. return _
  12. return deco

manual_clear 的实现

manual_clear 的实现就比较简单了,在代码逻辑的最后传入取缓存的函数和对象ID生成 key 然后删除就好了

  1. def manual_clear(self, cache_func, param_value):
  2. key = self.cache_key(cache_func, param_value)
  3. self.mc.delete(key)

batch_force_run 的实现

batch_run 的实现

—————2019-03—————–

烂尾了

烂尾了

烂尾了

言:
你在我规划的航程上,我在你投射的视线里。

Blog

Thoughts

Project