有这么个故事(如有雷同,纯属巧合)。有一天,某运营同学给某开发同学一个excel文件,里面是个客户清单。

“帮我查下这些用户的消耗呢”。

开发同学扫了一眼,几百个用户。这个事肯定是可以办的,但是想到麻烦程度,开发同学心里肯定是有不少羊驼经过的啦。

“有点麻烦啊”,开发同学轻轻抱怨。

“我懂的,把这个表和ODPS里的表join下就好了嘛。”运营同学努努嘴。

“……”。于是,开发同学把excel数据导出成文本格式,然后dship上传到ODPS,ODPS上编写SQL,dship下载,大功告成。

这里说得很轻松,但其实整个过程真的挺麻烦呢。要是这个过程中还要对excel中的数据进行过滤,最终结果还要绘个图,还是需要不少时间。

但是,如果这个开发同学使用PyOdps 0.4+版本新特性,一切就都轻松写意了。

为了模拟这个过程,我们拿movielens 100K的数据做例子,现在本地有一个excel表格,里面有100个需要查询的用户,表格包含两个字段,分别是用户ID和年龄。在ODPS上,我们有一张电影评分表,现在我们要求出这100用户个中年龄在20-30之间,按每个年龄来求电影评分均值,并用条形图展现。

可以想象,这个过程如果按照前面的描述,有多麻烦。那么用PyOdps DataFrame API呢。

首先,我们读出本地Excel文件。

QQ20160406_0_2x

In [14]: from odps.df import read_excel

In [15]: users = read_excel('/Users/chine/userids.xlsx')

In [16]: users.head(10)
|==========================================|   1 /  1  (100.00%)         0s
Out[16]: 
    id  age
0   46   27
1  917   22
2  217   22
3  889   24
4  792   40
5  267   23
6  626   23
7  433   27
8  751   24
9  932   58

In [40]: users.count()
|==========================================|   1 /  1  (100.00%)         0s
100

然后我们用join语句,过滤出来电影评分表中这些用户的评分数据。

In [17]: ratings = DataFrame(o.get_table('pyodps_ml_100k_ratings'))

In [18]: ratings.head(10)
|==========================================|   1 /  1  (100.00%)         2s
Out[18]: 
   user_id  movie_id  rating  unix_timestamp
0      196       242       3       881250949
1      186       302       3       891717742
2       22       377       1       878887116
3      244        51       2       880606923
4      166       346       1       886397596
5      298       474       4       884182806
6      115       265       2       881171488
7      253       465       5       891628467
8      305       451       3       886324817
9        6        86       3       883603013

In [25]: filter_ratings = ratings.join(users.filter(users.age.between(20, 30)), ('user_id', 'id'))[ratings, lambda x, y: y.age]  
# 这里做字段抽取时,可以使用Collection,也可以使用lambda表达式,参数是左右两个CollectionIn [26]: filter_ratings.head(10)
|==========================================|   1 /  1  (100.00%)        44s
Out[26]: 
   user_id  movie_id  rating  unix_timestamp  age
0        3       350       3       889237076   23
1        3       332       1       889237224   23
2        3       327       4       889237455   23
3        3       341       1       889237055   23
4        3       317       2       889237482   23
5        3       336       1       889237198   23
6        3       322       3       889237269   23
7        3       323       2       889237269   23
8        3       339       3       889237141   23
9        3       268       3       889236961   23

然后我们就可以按年龄聚合,求出评分均值啦。绘图也一气呵成。

In [28]: age_ratings = filter_ratings.groupby('age').agg(lambda x: x.rating.mean())

In [29]: age_ratings.head(10)
|==========================================|   1 /  1  (100.00%)        30s
Out[29]: 
   age  rating_mean
0   20     4.002309
1   21     4.051643
2   22     3.227513
3   23     3.519174
4   24     3.481013
5   25     3.774744
6   26     3.391509
7   27     3.355130
8   28     3.382883
9   29     3.705660

In [30]: age_ratings.plot(kind='bar', rot=45)
|==========================================|   1 /  1  (100.00%)        29s
Out[30]: <matplotlib.axes._subplots.AxesSubplot at 0x10b875f10>

age_ratings

超级简单,有木有!

这里的users其实是存在于本地的,而ratings是存在于ODPS上,用户依然可以join这两个Collection。其实对于0.4之前的版本,本地数据上传的接口也很容易(但是无法使用DataFrame API来进行本地过滤),但是对于0.4版本,不管一个Collection是存在于ODPS还是本地,用户都可以执行join和union的操作。

而这一切都源自0.4版本带来的新特性,DataFrame API的pandas计算后端。

DataFrame API使用pandas计算

我们知道,PyOdps DataFrame API类似于pandas的接口,但还是有些许不同的,那我们为什么不能用pandas来执行本地计算呢,这样也能充分利用pandas的一些特性,如支持各种数据输入。

所以,除了过去使用odps.models.Table来初始化DataFrame,我们也可以使用pandas DataFrame来初始化。

In [41]: import numpy as np

In [42]: import pandas as pd

In [44]: pandas_df = pd.DataFrame(np.random.random((10, 3)), columns=list('abc'))

In [45]: pandas_df
Out[45]: 
          a         b         c0  0.583845  0.301504  0.7642231  0.153269  0.335511  0.4551932  0.725460  0.460367  0.2947413  0.315234  0.907264  0.8493614  0.678395  0.642199  0.7460515  0.977872  0.841084  0.9315616  0.903927  0.846036  0.9824247  0.347098  0.373247  0.1938108  0.672611  0.242942  0.3817139  0.461411  0.687164  0.514689In [46]: df = DataFrame(pandas_df)

In [49]: type(df)
Out[49]: odps.df.core.DataFrame

In [47]: df.head(3)
|==========================================|   1 /  1  (100.00%)         0s
Out[47]: 
          a         b         c0  0.583845  0.301504  0.7642231  0.153269  0.335511  0.4551932  0.725460  0.460367  0.294741In [48]: df[df.a < 0.5].a.sum()
|==========================================|   1 /  1  (100.00%)         0s1.2770121422535428

这里转化成PyOdps DataFrame后,所有的计算也一样,变成延迟执行,PyOdps DataFrame在计算前的优化也同样适用。

这样做的好处是,除了前面我们提到的,能结合本地和ODPS做计算外;还有个好处就是方便进行本地调试。所以,我们可以用写出以下代码:

DEBUG = Trueif DEBUG:    # 这个操作使用tunnel下载,因此速度很快。对于分区表,需要给出所有分区值。
    df = ratings[:100].to_pandas(wrap=True)else:    df = ratings

在DEBUG的时候,我们能够利用PyOdps DataFrame在对原始表做切片操作时使用tunnel下载,速度很快的特性,选择原始表的一小部分数据来作为本地测试数据。值得注意的是, 本地计算通过不一定能在ODPS上也计算通过,比如自定义函数的沙箱限制 。

目前pandas计算后端尚不支持窗口函数。

apply和MapReduce API

使用apply对单行数据调用自定义函数

以前我们对于某个字段,能调用map来使用自定义函数,现在结合axis=1的apply,我们能对一行数据进行操作。

In [13]: ratings.apply(lambda row: row.rating / float(row.age), axis=1, reduce=True, types='float', names='rda').head(10)
|==========================================|   1 /  1  (100.00%)      1m44s
Out[13]: 
        rda0  0.1666671  0.1666672  0.2083333  0.2083334  0.1250005  0.2083336  0.1666677  0.2083338  0.2083339  0.125000

reduce为True的时候,会返回一个sequence,详细参考文档。

MapReduce API

PyOdps DataFrame API也提供MapReduce API。我们还是以movielens 100K为例子,看如何使用。

现在假设我们需要求出每部电影前两名的评分,直接上代码。

from odps.df import output

@output(['movie_id', 'movie_title', 'movie_rating'], ['int', 'string', 'int'])def mapper(row):    yield row.movie_id, row.title, row.rating

@output(['title', 'top_rating'], ['string', 'int'])def reducer(keys):
    i = [0]    def h(row, done):        if i[0] < 2:            yield row.movie_title, row.movie_rating
        i[0] += 1
    return h

In [7]: top_ratings = ratings.map_reduce(mapper, reducer, group='movie_id', sort='movie_rating', ascending=False)

In [10]: top_ratings.head(10)|==========================================|   1 /  1  (100.00%)      3m48s
Out[10]: 
               title  top_rating0   Toy Story (1995)           51   Toy Story (1995)           52   GoldenEye (1995)           53   GoldenEye (1995)           54  Four Rooms (1995)           55  Four Rooms (1995)           56  Get Shorty (1995)           57  Get Shorty (1995)           58     Copycat (1995)           59     Copycat (1995)           5

利用刚刚说的本地DEBUG特性,我们也能使用本地计算来验证,计算结果能很快得出。人生苦短!

In [22]: local_ratings = ratings[:100].to_pandas(wrap=True)
|==========================================|   1 /  1  (100.00%)         2sIn [23]: local_ratings.map_reduce(mapper, reducer, group='movie_id', sort='movie_rating', ascending=False).head(10)
|==========================================|   1 /  1  (100.00%)         0sOut[23]: 
                                               title  top_rating0  Shanghai Triad (Yao a yao yao dao waipo qiao) ...           51                              Twelve Monkeys (1995)           42                               Seven (Se7en) (1995)           43                         Usual Suspects, The (1995)           54                                 Postino, Il (1994)           35                          Mr. Holland's Opus (1995)           46                                 Taxi Driver (1976)           57                                       Crumb (1994)           58                                   Star Wars (1977)           59                                   Star Wars (1977)           5

cache机制

在0.4之前的版本,我们提供了一个persist接口,来保存执行结果。但是这个操作是个立即执行接口。现在我们提供cache接口,cache的collection会被单独计算,但不会立即执行。

In [25]: tmpdf = ratings[ratings.title.len() > 10].cache()In [26]: tmpdf['title', 'movie_id'].head(3)
|==========================================|   1 /  1  (100.00%)        35sOut[26]: 
                  title  movie_id0  Seven (Se7en) (1995)        111  Event Horizon (1997)       2602      Star Wars (1977)        50In [27]: tmpdf.count()  # tmpdf已经被cache,所以我们能立刻计算出数量
|==========================================|   1 /  1  (100.00%)         0s99823

记住,目前的cache接口,计算的结果还是要落地的,并不是存放在内存中。

而一个collection如果已经被计算过,这个过程会自动触发cache机制,后续的计算过程会从这计算个向后进行,而不再需要从头计算。

其他特性

PyOdps 0.4版本还带来一些其他特性,比如join支持mapjoin(只对ODPS后端有效);Sequence上支持unique和nunique;execute_sql执行时支持设置hints,对于IPython插件,支持使用SET来设置hints,等等。

PyOdps下一步计划

对于PyOdps的DataFrame API来说,我们的短期目标是能完成ODPS SQL能做的所有事情,然后在这个基础上再带来更多SQL不容易做到的,但是却很有用的操作。现在,除了自定义聚合函数,我们已经能基本涵盖所有的SQL场景。

PyOdps非常年轻,期待大家来使用、提feature、贡献代码。

  • 安装方法:pip install pyodps

  • Github:https://github.com/aliyun/aliyun-odps-python-sdk

  • 文档:http://pyodps.readthedocs.org/

  • bug report:https://github.com/aliyun/aliyun-odps-python-sdk/issues

原文链接:http://click.aliyun.com/m/14035/