Garland +

用装饰器给函数加缓存

简介

一个在外面流通的业务对象会被很多地方调用,比如一个商品,会出现在首页,店铺页,活动页,各种列表页等等, 每次查询都是都是通过商品 service 的查询方法 get_goods_by_id 来查询数据库得到 orm 对象,然后序列化, 返回给更上层的调用方,不加缓存的话一个对象会在多个地方被查询,这样就给数据库很大压力,那么缓存加在哪儿?

我们目前的缓存分为两种,简单讲的话就是一种能手动清,另一种不能手动清,能手动清理的目前只用在缓存 orm 对象实例 上,也就是数据库的一条记录以及实例上的方法,这边的 key 格式都固定(虽然是手写 23333),另一种不能手动清理的就是 方法根据函数和参数自己生成一个 key,只能依赖业务本身的性质去设置一个超时时间,这种一般用在缓存 API 接口的数据 或者接口的某个字段,或者某些返回 python 基本类型的方法。

为什么不缓存 orm 对象实例,因为代码服务化以后不希望 orm 上的操作方法暴露在 service 外面;为什么不用第二种根据方法和参数 自动生成 key 的缓存,因为这种流通的业务对象的性质决定了它的缓存是需要被清理的,比如商品的信息是随时会变的,而且根据 方法和参数自动生成的话,函数复杂的话就是几层的笛卡尔乘积,就根本没法清了。所以缓存不是万能的,需要限定下使用场景。

目前的问题是想缓存流通的业务对象,这种对象的查询在 service 层接口一般的方法是 batch_get_xxx_by_id,这里用 batch 是因为平时会遇到大量的批量查询需求,并且批量查询比给一堆 id 然后单个查询要快一个数量级(如果是主键的话), 入参是一个 id数组,返回值是和入参数组里 id 一一对应的一个 dict 类型的,值为 python 基本类型的以及我们定义 的 dto(data transfer object),也就是如下的方式


def _build_dto_from_dao(dao):
    # do something
    return dto


def batch_get_goods_by_id(goods_ids):
    if not goods_ids:
        return {}
    results = {goods_id: None for goods_id in goods_ids}
    daos = query_goods_daos_by_ids(goods_ids)
    for dao in daos:
        if not dao:
            continue
        results[dao.id] = _build_dto_from_dao(dao)
    return results

所以这个缓存要加在和上面类似的这种函数上,正确的讲应该是这次讨论的这个缓存函数。

基本调用方式

现在讲一下期望的缓存函数的设计,被缓存的函数样子上面已经讲了,所以缓存函数应该是这么个样子

@cache(expire=ONE_DAY)
def batch_get_goods_by_id(goods_ids):
    pass

清缓存的地方就多了,更新数据的地方有很多,每个方法都不太一样,这里主要分为两种,也就是函数参数里面传不传对象 ID 的, 传了对象ID的,可以把参数名传进清缓存的函数里自动清理,好处是函数内部没有混入清缓存的逻辑,没传对象ID,然后因为某些 逻辑需要在函数内部更新对象数据的,就需要手动清理了,和查询不一样,大部分情况下更新数据底层逻辑都是单个更新,一般没有 批量更新这种说法,所以要在函数逻辑内部加入清缓存的逻辑,需要传入对象ID,也就是如下的方式


@cache(expire=ONE_DAY)
def batch_get_goods_by_id(goods_ids):
    pass


@auto_clear(batch_get_goods_by_id, 'goods_id')
def update_goods_name(goods_id, new_goods_name):
    pass


def update_somethind(*args, **kwargs):
    # do something
    goods_dao = query_goods_dao_by_id(obj.goods_id)
    goods_dao.update_something

    # 清理缓存
    manual_clear(batch_get_goods_by_id, goods_dao.id)

具体实现

要用的几个方法名已经在上面讨论过了,然后存储用的是 Memcache,所以要在初始化的时候传一个 mc_client 进去, 然后加一个生成 key 的方法用来开发调试,所以具体的代码结构大概如下:

class SrvBatchCache(object):
    def __init__(self, mc):
        self.mc = mc

    def cache(self):
        pass

    def auto_clear(self):
        pass

    def manual_clear(self):
        pass

    def cache_key(self):
        pass

cache_key 函数的实现

先讲下缓存的 key 的实现逻辑,因为 cache 函数和 clear 函数都依赖这个。期望的调用方式是传入待缓存的函数和对应的 业务对象ID,返回的是一个 cache_key 字符串,也就是如下的调用方式:

def cache_key(self, cache_func, param_value):
    key_pattern = self._cache_key_pattern(cache_func)
    return key_pattern.format(param_value)

生成 key_pattern 的逻辑是分开的,所以要单独写个方法,现在看一下这个 key_pattern 怎么生成。key 要保证全局唯一, 只用函数名做 key 是不行的,不能完全保证不会有重名的方法,所以需要加上函数的 path;然后是被缓存的函数的唯一的参数名和对应 的业务ID;然后被缓存的函数比如重写了的话,返回值也乱了,所以还得塞一个 version 进去。所以最终是形如这样的 key

"/home/garland/src/current/broker/sys/goods_sys/api/query.pyc:batch_get_goods:goods_ids=100000001:version=0"

函数的相对路径可以用 inspect.getmodule 方法拿到,全路径可以用 os 库拿到,version 和参数名,是函数初始化的时候写入函数信息里面的, 所以 _cache_key_pattern 实现如下:

def _cache_key_pattern(self, f):
    module_path = inspect.getmodule(f).__file__
    full_path = os.path.abspath(module_path)
    version = f._cache_info.get('_version', 0)
    param_name = f._cache_info.get('_param_name', '')
    return '%s:%s:%s={}:version=%s' % (full_path, f.__name__, param_name, version)

cache 函数的实现

对 cache 函数的要求是在初始化的时候将一些基本信息如 version 和参数名打进被缓存的函数内,取缓存的逻辑放在 cache 函数以下。因为是带参数的装饰器,所以函数基本 的样子如下

from functools import wraps

def cache(self, version=0, expire=86400):
    def deco(f):
        # balabala

        @wraps(f)
        def _(*args, **kwargs):
            # balabala
            return something
        return _
    return deco

因为限定了使用场景,所以要在应用初始化的时候做一些基本的校验,校验以下被装饰的函数参数信息和 cache 函数的参数,这里要 用到 inspect 这个库里面的 getargspec 方法,方法的注释如下:

In [2]: inspect.getargspec?
Signature: inspect.getargspec(func)
Docstring:
Get the names and default values of a function's arguments.

A tuple of four things is returned: (args, varargs, varkw, defaults).
'args' is a list of the argument names (it may contain nested lists).
'varargs' and 'varkw' are the names of the * and ** arguments or None.
'defaults' is an n-tuple of the default values of the last n arguments.
File:      ~/miniconda2/envs/xcf/lib/python2.7/inspect.py
Type:      function

传入被装饰的函数,返回的 tuple 的第一个元素是参数列表,在这里校验下参数数量,然后把参数名信息打到函数上。在函数被调用的时候, 考虑到有时候线上出故障紧急修复,函数可以传入 force=True 参数,来强制刷新缓存,其他的就是缓存更新的策略,这里详细讲的话又是 另一篇文章了,可以参考 缓存更新的套路 这篇文章,具体到我们这里是用的就是 Cache Aside Pattern,即:

缓存更新的策略是个通用的策略,和这次写的这个针对特定函数的装饰器逻辑是分开的,所以代码可以分层,也就是如下的形式:

class BatchCache(object):

    def __init__(self):
        pass

    def batch_force_run(self):
        pass

    def batch_run(self):
        pass

所以具体的 cache 函数的实现如下:

def cache(self, version=0, expire=86400):
    '''缓存 batch_get_* 的返回值,有且只有一个 iterable 类型的参数'''
    def deco(f):
        # 基本校验
        arg_names, _, _, _ = inspect.getargspec(f)
        if len(arg_names) != 1:
            raise ValueError('only support one argument')
        if not isinstance(version, int):
            raise ValueError('version must be int type')

        # 相关信息打入要缓存的函数内
        param_name = arg_names[0]
        f._cache_info = {'_version': version, '_param_name': param_name}

        @wraps(f)
        def _(*args, **kwargs):
            force = kwargs.pop('force', False)
            # 获取参数值
            param_value = kwargs.get(param_name) or args[0]

            # 批量组装对象 id 和对应的 key
            keys_map = {oid: self.cache_key(f, oid) for oid in param_value}

            cache = BatchMutexCache(f, expire, keys_map, mc=self.mc)
            if force:
                # 强制刷新缓存
                return cache.batch_force_run(param_value)
            return cache.batch_run(param_value)
        return _
    return deco

*_clear 函数的实现

auto_clear 函数的实现

auto_clear 是在那些需要更新缓存的传了对象ID的方法里,把对应对象ID的参数名传到装饰器内生成 key 自动清理,这里比较麻烦的 地方是获取指定参数的 param_name 值,还是要用到 inspect.getargspec 这个方法, _get_param_value 方法实现如下:

def _get_param_value(self, f, param_name, *args, **kwargs):
    '''获取指定的参数 param_name 值'''
    arg_names, _, _, defaults = inspect.getargspec(f)
    if param_name not in arg_names:
        raise ValueError('{} not in {}'.format(param_name, arg_names))

    # 组装默认值参数
    default_args = dict(zip(arg_names[-len(defaults):], defaults)) if defaults else {}

    res = default_args.copy()
    res.update(zip(arg_names, args))
    res.update(kwargs)
    return res.get(param_name)

更新缓存的逻辑是,先写数据库,成功后,再让缓存失效,所以 auto_clear 的实现如下

def auto_clear(self, cache_func, param_name):
    '''自动清理 cache_func 的缓存'''
    def deco(f):
        @wraps(f)
        def _(*args, **kwargs):
            param_value = self._get_param_value(f, param_name, *args, **kwargs)
            key = self.cache_key(cache_func, param_value)
            res = f(*args, **kwargs)
            self.mc.delete(key)
            return res
        return _
    return deco

manual_clear 的实现

manual_clear 的实现就比较简单了,在代码逻辑的最后传入取缓存的函数和对象ID生成 key 然后删除就好了

def manual_clear(self, cache_func, param_value):
    key = self.cache_key(cache_func, param_value)
    self.mc.delete(key)

BatchCache 的实现

batch_force_run 的实现

batch_run 的实现

Blog

Thoughts

Project