基于 MaxCompute MaxFrame 实现分布式 Pandas 处理

简介: 阿里云分布式计算框架 MaxCompute MaxFrame 兼容 Pandas 接口且自动进行分布式处理,在保证强大数据处理能力的同时,可以大幅度提高数据处理规模及计算效率。

MaxFrame可以在分布式环境下使用与Pandas相同的API来分析数据,通过MaxFrame,您能够以高于开源Pandas数十倍的性能在MaxCompute上快速完成数据分析和计算工作。本文为您介绍如何通过MaxFrame使用常用的Pandas算子。


前提条件

已安装MaxFrame,详情请参见准备工作


数据准备

  1. 在安装了MaxFrame的Python环境下运行如下脚本,准备测试表和测试数据。
from odps import ODPS
from maxframe.session import new_session
import maxframe.dataframe as md
import pandas as pd
o = ODPS(
    # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
    # 不建议直接使用AccessKey ID和 AccessKey Secret字符串。
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='your-default-project',
    endpoint='your-end-point',
)
data_sets = [{
    "table_name": "product",
    "table_schema" : "index bigint, product_id bigint, product_name string, current_price bigint",
    "source_type": "records",
    "records" : [
        [1, 100, 'Nokia', 1000],
        [2, 200, 'Apple', 5000],
        [3, 300, 'Samsung', 9000]
    ],
},
{
    "table_name" : "sales",
    "table_schema" : "index bigint, sale_id bigint, product_id bigint, user_id bigint, year bigint, quantity bigint, price bigint",
    "source_type": "records",
    "records" : [
        [1, 1, 100, 101, 2008, 10, 5000],
        [2, 2, 300, 101, 2009, 7, 4000],
        [3, 4, 100, 102, 2011, 9, 4000],
        [4, 5, 200, 102, 2013, 6, 6000],
        [5, 8, 300, 102, 2015, 10, 9000],
        [6, 9, 100, 102, 2015, 6, 2000]
    ],
    "lifecycle": 5
}]
def prepare_data(o: ODPS, data_sets, suffix="", drop_if_exists=False):
    for index, data in enumerate(data_sets):
        table_name = data.get("table_name")
        table_schema = data.get("table_schema")
        source_type = data.get("source_type")
        
        if not table_name or not table_schema or not source_type:
            raise ValueError(f"Dataset at index {index} is missing one or more required keys: 'table_name', 'table_schema', or 'source_type'.")
        lifecycle = data.get("lifecycle", 5)
        table_name += suffix
        
        print(f"Processing {table_name}...")
        if drop_if_exists:
            print(f"Deleting {table_name}...")
            o.delete_table(table_name, if_exists=True)
        
        o.create_table(name=table_name, table_schema=table_schema, lifecycle=lifecycle, if_not_exists=True)
        if source_type == "local_file":
            file_path = data.get("file")
            if not file_path:
                raise ValueError(f"Dataset at index {index} with source_type 'local_file' is missing the 'file' key.")
            sep = data.get("sep", ",")
            pd_df = pd.read_csv(file_path, sep=sep)
            ODPSDataFrame(pd_df).persist(table_name, drop_table=True)
        elif source_type == 'records':
            records = data.get("records")
            if not records:
                raise ValueError(f"Dataset at index {index} with source_type 'records' is missing the 'records' key.")
            with o.get_table(table_name).open_writer() as writer:
                writer.write(records)
        else:
            raise ValueError(f"Unknown data set source_type: {source_type}")
        
        print(f"Processed {table_name} Done")
prepare_data(o, data_sets, "_maxframe_demo", True)


参数说明:

  • ALIBABA_CLOUD_ACCESS_KEY_ID:需将该环境变量设置为具备目标MaxCompute项目中待操作对象相关MaxCompute权限的AccessKey ID。您可以进入AccessKey管理页面获取AccessKey ID。
  • ALIBABA_CLOUD_ACCESS_KEY_SECRET:需将该环境变量设置为AccessKey ID对应的AccessKey Secret。
  • your-default-project:使用的MaxCompute项目名称。您可以登录MaxCompute控制台,在左侧导航栏选择工作区>项目管理,查看MaxCompute项目名称。
  • your-end-point:目标MaxCompute项目所在地域的Endpoint,可根据网络连接方式自行选择,例如http://service.cn-chengdu.maxcompute.aliyun.com/api。详情请参见Endpoint


  1. 查询sales_maxframe_demo表和product_maxframe_demo表的数据,SQL命令如下。
--查询sales_maxframe_demo表
SELECT * FROM sales_maxframe_demo;
--返回
+------------+------------+------------+------------+------------+------------+------------+
| index      | sale_id    | product_id | user_id    | year       | quantity   | price      |
+------------+------------+------------+------------+------------+------------+------------+
| 1          | 1          | 100        | 101        | 2008       | 10         | 5000       |
| 2          | 2          | 300        | 101        | 2009       | 7          | 4000       |
| 3          | 4          | 100        | 102        | 2011       | 9          | 4000       |
| 4          | 5          | 200        | 102        | 2013       | 6          | 6000       |
| 5          | 8          | 300        | 102        | 2015       | 10         | 9000       |
| 6          | 9          | 100        | 102        | 2015       | 6          | 2000       |
+------------+------------+------------+------------+------------+------------+------------+
--查询product_maxframe_demo表数据
SELECT * FROM product_maxframe_demo;
--返回
+------------+------------+--------------+---------------+
| index      | product_id | product_name | current_price |
+------------+------------+--------------+---------------+
| 1          | 100        | Nokia        | 1000          |
| 2          | 200        | Apple        | 5000          |
| 3          | 300        | Samsung      | 9000          |
+------------+------------+--------------+---------------+


使用MaxFrame进行数据分析

场景1:使用merge方法连接两张数据表,以获取sales_maxframe_demo表中所有sale_id对应的product_name以及该产品的所有yearprice

  • 示例代码
from odps import ODPS
from maxframe.session import new_session
import maxframe.dataframe as md
o = ODPS(
    # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
    # 不建议直接使用 Access Key ID / Access Key Secret 字符串
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='your-default-project',
    endpoint='your-end-point',
)
session = new_session(o)
#session id是一串用于关联MaxFrame task的字符,对于调试和追踪任务状态有重要的作用。
print(session.session_id)
sales = md.read_odps_table("sales_maxframe_demo", index_col="index")
product = md.read_odps_table("product_maxframe_demo", index_col="product_id")
#这里的df并不会立即执行,除非您使用df.execute()来触发。
#这意味着所有的计算都将最终完全在MaxCompute集群完成,避免了中间所不必要的数据传输和阻塞。
df = sales.merge(product, left_on="product_id", right_index=True)
df = df[["product_name", "year", "price"]]
print(df.execute().fetch())
#保存结果到MaxCompute表中,并销毁Session
md.to_odps_table(df, "result_df", overwrite=True).execute()
session.destroy()


  • 返回结果:
index product_name  year  price                   
1            Nokia  2008   5000
2          Samsung  2009   4000
3            Nokia  2011   4000
4            Apple  2013   6000
5          Samsung  2015   9000
6            Nokia  2015   2000


  • 性能对比
    在sales表数据量为5000W条(size:1.96 GB),product表数据量为10W条(size:3 MB)的数据样本中进行运算,可以得到如下耗时对比结果:

环境

耗时(单位:秒)

本地Pandas(版本为1.3.5)

65.8

MaxFrame

22


场景2:选出每个出售过的产品第一年销售的产品ID、年份、数量和价格

  • 示例代码:
from odps import ODPS
from maxframe.session import new_session
import maxframe.dataframe as md
o = ODPS(
    # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
    # 不建议直接使用 Access Key ID / Access Key Secret 字符串
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='your-default-project',
    endpoint='your-end-point',
)
session = new_session(o)
#session id是一串用于关联MaxFrame task的字符,对于调试和追踪任务状态有重要的作用。
print(session.session_id)
# 聚合获取每个产品的第一个年份
min_year_df = md.read_odps_table("sales_maxframe_demo", index_col="index")
min_year_df = min_year_df.groupby('product_id', as_index=False).agg(first_year=('year', 'min'))
# join 找到对应的销售记录
sales = md.read_odps_table("sales_maxframe_demo", index_col=['product_id', 'year'])
result_df = md.merge(sales, min_year_df, 
                        left_index=True, 
                        right_on=['product_id','first_year'],
                        how='inner')
#这里的result_df并不会立即执行,除非您使用 result_df.execute()来触发。
#这意味着所有的计算都将最终完全在MaxCompute中集群完成,避免了中间所不必要的数据传输和阻塞。
result_df = result_df[['product_id', 'first_year', 'quantity', 'price']]
print(result_df.execute().fetch())
#销毁 Session
session.destroy()


  • 返回结果:
product_id  first_year  quantity  price
100         100        2008        10   5000
300         300        2009         7   4000
200         200        2013         6   6000


  • 性能对比:
    在sales表数据量为5000W条(size:1.96 GB),product表数据量为10W条(size:3 MB)的数据样本中进行运算,可以得到如下耗时对比结果:

环境

耗时(单位:秒)

本地Pandas(版本为1.3.5)

186

MaxFrame

21


场景3:为每个用户获取其消费最多的产品ID

说明:该场景将演示多次groupby、join、drop_duplicates和sort_values操作。


  • 示例代码
from odps import ODPS
from maxframe.session import new_session
import maxframe.dataframe as md
o = ODPS(
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='your-default-project',
    endpoint='your-end-point',
)
session = new_session(o)
#session id 是一串用于关联 MaxFrame task 的字符,对于调试和追踪任务状态有重要的作用。
print(session.session_id)
sales = md.read_odps_table("sales_maxframe_demo", index_col="index")
product = md.read_odps_table("product_maxframe_demo", index_col="product_id")
sales['total'] = sales['price'] * sales['quantity']
product_cost_df = sales.groupby(['product_id', 'user_id'], as_index=False).agg(user_product_total=('total','sum'))
product_cost_df = product_cost_df.merge(product, left_on="product_id", right_index=True, how='right')
user_cost_df = product_cost_df.groupby('user_id').agg(max_total=('user_product_total', 'max'))
merge_df = product_cost_df.merge(user_cost_df, left_on='user_id', right_index=True)
#这里的 result_df 并不会立即执行,除非您使用 result_df.execute()来触发。
#这意味着所有的计算都将最终完全在 MaxCompute 中集群完成,避免了中间所不必要的数据传输和阻塞。
result_df = merge_df[merge_df['user_product_total'] == merge_df['max_total']][['user_id', 'product_id']].drop_duplicates().sort_values(['user_id'], ascending = [1])
print(result_df.execute().fetch())
#销毁 Session
session.destroy()


  • 返回结果:
user_id  product_id
100      101         100
300      102         300


  • 性能对比
    在sales表数据量为5000W条(size:1.96 GB),product表数据量为10W条(size:3 MB)的数据样本中进行计算,可以得到如下耗时对比结果:


结论

MaxFrame兼容Pandas接口且自动进行分布式处理,在保证强大数据处理能力的同时,可以大幅度提高数据处理规模及计算效率。



相关链接

  1. MaxFrame 详情页:https://www.aliyun.com/product/bigdata/odps/maxframe
  2. MaxFrame 试用申请表:https://survey.aliyun.com/apps/zhiliao/m40AIrxhA
  3. MaxFrame 试用特惠包:https://common-buy.aliyun.com/?spm=5176.29055221.J_2883378880.2.30e127f9nI7roF&commodityCode=odps_cu_dp_cn


MaxFrame官方用户支持钉钉群

(钉钉搜索37130012987)

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
2天前
|
机器学习/深度学习 分布式计算 数据挖掘
阿里云 MaxCompute MaxFrame 开启免费邀测,统一 Python 开发生态
阿里云 MaxCompute MaxFrame 正式开启邀测,统一 Python 开发生态,打破大数据及 AI 开发使用边界。
756 1
|
2天前
|
存储 SQL 数据可视化
Python大数据之pandas快速入门(一)
Python大数据之pandas快速入门(一)
78 0
|
数据采集 分布式计算 MaxCompute
pandas+odps实现批量化表信息解析
最近在做数仓建设,发现很多弊病都来自ods的数据没有探测好,之前都是人手工探查,看具体的字段注释是否是枚举或者最大值最小值等判定,再加上后续数据采集方式的调整,再次探查的成本会增大,我就寻思能够利用pandas来分析odps的数据。
420 0
|
SQL 数据采集 JSON
天池大数据竞赛 Spaceack带你利用Pandas,趋势图与桑基图分析美国选民候选人喜好度
竞赛地址: https://tianchi.aliyun.com/competition/entrance/531837/introduction 首先,这是一篇面向新人的教程导向的分析文章,(by the way其实我也是新手,从比赛开始才学的Pandas库,这也是我的一篇学习笔记),所以会包含很多函数的基础用法,解题思路等等, 流程会比较详细。 其次,本文在官方教程基础上会加入创新内容,但是绝不会为了用而用某种新方法,一定本着分析数据有所帮助的原则和对数据敬畏的态度来做。
396 0
天池大数据竞赛 Spaceack带你利用Pandas,趋势图与桑基图分析美国选民候选人喜好度
|
2天前
|
分布式计算 大数据 BI
MaxCompute产品使用合集之MaxCompute项目的数据是否可以被接入到阿里云的Quick BI中
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
SQL 分布式计算 大数据
MaxCompute产品使用合集之怎样可以将大数据计算MaxCompute表的数据可以导出为本地文件
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
分布式计算 DataWorks 关系型数据库
MaxCompute产品使用合集之可以使用什么方法将MySQL的数据实时同步到MaxCompute
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
分布式计算 DataWorks 数据库
DataWorks操作报错合集之DataWorks使用数据集成整库全增量同步oceanbase数据到odps的时候,遇到报错,该怎么处理
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
25 0
|
2天前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之在 DataWorks 中,使用Oracle作为数据源进行数据映射和查询,如何更改数据源为MaxCompute或其他类型
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
31 1
|
2天前
|
分布式计算 DataWorks 调度
DataWorks产品使用合集之在DataWorks中,查看ODPS表的OSS对象如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
29 1

热门文章

最新文章

http://www.vxiaotou.com