在数据工程领域,Snowflake 一直以其卓越的SQL引擎而闻名,其云计算原生架构、自动优化功能和高并发能力使其成为处理复杂数据工作负载的首选。然而,面对日益复杂的AI驱动的分析需求,传统的基于SQL的转换已经显得力不从心。现在,Snowflake Cortex AI-SQL的出现,彻底改变了这一局面,它将AI能力无缝集成到SQL引擎中,为数据工程师开启了全新的可能性。

Snowflake Cortex:AI与SQL的完美融合

Snowflake Cortex 产品线允许用户直接在SQL中运行由大型语言模型 (LLM) 提供支持的查询,从而无需使用外部AI工具。这些功能实现了无缝的文本分析、情感检测、摘要和智能数据处理,无需离开 Snowflake 环境。这使得数据工程师和分析师即使没有机器学习方面的专业知识,也可以利用 AI 驱动的 SQL 函数,降低了 AI 的使用门槛,使其更易于访问、更高效且更具成本效益。通过将 AI 功能直接嵌入到 Snowflake 中,Snowflake Cortex 减少了对外部 AI 服务、API 集成和复杂模型管理的需求,从而降低了运营成本并简化了工作流程。

例如,想象你需要分析大量的客户评论数据,了解用户对某个产品的整体情感倾向。传统的方式可能需要将数据导出到外部的NLP工具,进行分析后再导入回来。而使用 Snowflake Cortex,你可以直接使用SNOWFLAKE.CORTEX.SENTIMENT(review)函数,在SQL查询中轻松完成情感分析,得到每个评论的情感极性(正面、负面或中性),大幅简化了流程并提升效率。

AI-SQL Operators:数据管道中的强大助手

Snowflake Cortex 引入了新的 AI-SQL Operators,例如 AI_FILTERAI_AGG (Private Preview),使数据工程师能够直接在其数据管道中利用大型语言模型 (LLM) 类似的功能,以便使用 LLM 类似的传统数据库运算符轻松分析多行。这些操作符为数据处理带来了革命性的变化,以下是一些实际用例:

  1. 图像过滤AI_FILTER 可以过滤掉所有包含可识别面部的图像。例如,假设你有一个包含大量用户上传图片的数据库,出于隐私保护的目的,你需要移除所有包含人脸的图片。使用AI_FILTER,你可以编写简单的SQL语句实现这一目标,无需手动审查每一张图片。

    SELECT * FROM image_table WHERE NOT SNOWFLAKE.CORTEX.AI_FILTER('contains an identifiable face', image_data);
    
  2. 信息聚合AI_AGG 可以总结表中的所有工单中的见解。例如,客户服务团队可能需要了解一段时间内客户提出的主要问题和投诉。AI_AGG 可以从工单记录中提取关键信息,并生成一份简洁明了的摘要报告,帮助团队快速了解客户需求并做出相应改进。

    SELECT ai_agg(ticket_content, 'Summarize the main issues and customer sentiment.') FROM customer_support_tickets;
    
  3. 实体标准化AI_AGG 可以对列中的所有实体名称进行规范化。例如,一个包含客户信息的表格可能存在姓名拼写不一致的问题,例如 “John Smith”、”Jon Smith” 和 “J. Smith”。AI_AGG 可以通过识别并统一这些拼写,提高数据质量和准确性。

    SELECT ai_agg(customer_name, 'Standardize customer names.') FROM customer_table;
    
  4. 数据关联:可以将包含图像数据的列与另一个表中的文本列连接起来。 例如,在电商平台中,可以将商品图片与商品描述文本连接起来,为用户提供更全面的商品信息。

构建AI驱动的数据管道:实践案例

文章中给出了一个利用 Snowflake Cortex 构建数据管道的示例,以下是对该示例的详细解读:

  1. 数据准备:首先,需要创建一个stage,将数据文件上传到该stage,然后创建一个文件格式,定义CSV文件的格式。接着,创建一个目标表 product_reviews_master,用于存储原始数据。

    create or replace stage my_stage;
    -- upload data file into this stage, using UI or SnowCLI
    ls @my_stage;
    CREATE FILE FORMAT LOAD_CSV_FF
    TYPE=CSV
    SKIP_HEADER=1
    FIELD_DELIMITER=','
    TRIM_SPACE=TRUE
    FIELD_OPTIONALLY_ENCLOSED_BY='"'
    REPLACE_INVALID_CHARACTERS=TRUE
    DATE_FORMAT=AUTO
    TIME_FORMAT=AUTO
    TIMESTAMP_FORMAT=AUTO;
    -- We will use this table to push data into raw data to create pipeline
    CREATE OR REPLACE TABLE product_reviews_master(
        product_id VARCHAR ,
        product_title VARCHAR ,
        rating NUMBER(38, 0) ,
        summary VARCHAR ,
        review VARCHAR ,
        location VARCHAR ,
        date VARCHAR ,
        upvotes NUMBER(38, 0) ,
        downvotes NUMBER(38, 0)
    );
    -- load data into table
    COPY INTO PRODUCT_REVIEWS_MASTER
    FROM @my_stage/Flipkart_Reviews.csv
    FILE_FORMAT = (FORMAT_NAME = 'LOAD_CSV_FF')
    -- make sure data
    SELECT COUNT(*) FROM PRODUCT_REVIEWS_MASTER;
    
  2. 数据管道构建:创建一个类似 product_reviews_master 的表 product_reviews,并创建一个stream product_reviews_stream,用于跟踪对 product_reviews 表的更改。

    -- data pipeline start here, create table and stream on table
    CREATE OR REPLACE TABLE product_reviews like product_reviews_master;
    CREATE OR REPLACE STREAM product_reviews_stream on table product_reviews;
    
  3. 数据注入:将数据从 product_reviews_master 表插入到 product_reviews 表中。在实际应用中,数据可能来自Snowpipe、Kafka等上游数据源。

    -- lets ingest some data ,
    -- In real world this data come from upstream sources,
    -- using snowpipe, kafka etc
    insert into PRODUCT_REVIEWS
    select * from product_reviews_master limit 10 offset 200;
    
  4. AI分析:使用 SNOWFLAKE.CORTEX.SENTIMENT 函数进行情感分析,使用 SNOWFLAKE.CORTEX.CLASSIFY_TEXT 函数进行文本分类,创建一个新表 product_review_analysis,用于存储分析结果。

    -- Above insert statment make data available in stream
    -- use AISQL Function to find sentiment and classification
    CREATE OR REPLACE TABLE product_review_analysis
    as
    SELECT
        product_id,
        review,
        SNOWFLAKE.CORTEX.SENTIMENT(review) AS sentiment,
        SNOWFLAKE.CORTEX.CLASSIFY_TEXT(review,ARRAY_CONSTRUCT('POSITIVE', 'NEUTRAL', 'NEGATIVE')) AS comment_category
    from PRODUCT_REVIEWS_STREAM;
    -- query data for analysis
    select * from product_review_analysis;
    
  5. 自动化:创建一个 Triggered Task,当stream中有新数据到达时,自动执行AI分析,并将结果插入到 product_review_analysis 表中。

    -- Now create Triggered Task when data arrive in the stream it will do
    -- transformation using ai powerd functions
    CREATE OR REPLACE TASK prod_review_analysis_task
    warehouse = demo_wh
    USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS = 15
    WHEN
    SYSTEM$STREAM_HAS_DATA('PRODUCT_REVIEWS_STREAM')
    AS
    INSERT INTO product_review_analysis(product_id, review, sentiment, comment_category)
    SELECT
        product_id,
        review,
        SNOWFLAKE.CORTEX.SENTIMENT(review) AS sentiment,
        SNOWFLAKE.CORTEX.CLASSIFY_TEXT(review,ARRAY_CONSTRUCT('POSITIVE', 'NEUTRAL', 'NEGATIVE')) AS comment_category
    FROM PRODUCT_REVIEWS_STREAM;
    ALTER task prod_review_analysis_task resume;
    -- test it out by ingesting more data into raw table
    insert into PRODUCT_REVIEWS
    select * from product_reviews_master limit 10 offset 400;
    -- in less than 15 second new data got transformed.
    select * from product_review_analysis;
    -- your data pipline is ready now
    

AI驱动的分析:营销团队的利器

在数据被转换后,下一步是执行 AI 驱动的分析。例如,营销团队可能希望向对产品提供负面反馈的用户提供促销活动。 使用 AI_FILTER 函数,你还可以推断缺失的信息,例如在只有城市名称可用时识别州/省。

SELECT * FROM PRODUCT_REVIEWS
WHERE
    SNOWFLAKE.CORTEX.AI_FILTER('The reviewer did not like product', review);
-- or person leaving in specific area
SELECT * FROM PRODUCT_REVIEWS
WHERE
    SNOWFLAKE.CORTEX.AI_FILTER('Is in Maharastra', location);
-- please note column do not have state Maharastra - but llm find it those city in Maharastra

此示例展示了如何使用 AI_FILTER 找出对产品不满意的用户,甚至可以根据城市名称推断用户的所在州/省,即使原始数据中没有提供州/省信息。

接下来,可以使用 AI_AGG 运算符获取基于位置的一些反馈的聚合反馈:

SELECT
    ai_agg(review, 'Summarize feedback for product and suggest quotes from those.')
FROM PRODUCT_REVIEWS
WHERE
    SNOWFLAKE.CORTEX.AI_FILTER('The reviewer did not like it', review)
    AND SNOWFLAKE.CORTEX.AI_FILTER('Location in Maharastra', location);

此示例展示了如何结合 AI_FILTERAI_AGG,找出对产品不满意的位于特定位置(例如马哈拉施特拉邦)的用户,并生成反馈摘要,以及用户评价的引用,帮助营销团队更好地了解用户需求并制定个性化的营销策略。

非结构化数据处理:图像到数据的转换

Snowflake Cortex 还可以用于处理非结构化数据,例如图像。文章中展示了如何将图表转换为数据:

  1. 创建Stage:首先,创建一个stage,用于存储非结构化数据。

    -- stage with serverside encryption
    CREATE STAGE unstructured_data
    ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');
    -- upload above image into this stage
    
  2. 创建Table:创建一个table,用于存储从图表中解析出的数据。

    --create table to store data from charts
    create or replace table chart_data(out_data variant);
    
  3. 数据解析:使用 Snowflake Cortex 的 Claude 模型解析图像,将图表转换为 JSON 文件,并将数据插入到表中。

    -- Parse data using cortex cluade model
    insert into chart_data
    SELECT TO_VARIANT (PARSE_JSON (SNOWFLAKE.CORTEX.COMPLETE ('claude-3-5-sonnet','Turn this pie chart into a JSON file containing two attributes for each pie: spending and percentage. Output should contain a valid JSON file only, no other texts.',TO_FILE ('@unstructured_data', 'usbudgetspending.png' ))));
    
  4. 数据查询:从表中查询解析出的数据,进行分析。

    SELECT
        chart.value:spending::string AS spending,
        chart.value: percentage::numeric(4,2) AS PERCENTAGE
    FROM chart_data AS c,LATERAL FLATTEN (INPUT => c.out_data, PATH=> 'spending_data') AS chart
    ORDER BY
        PERCENTAGE desc;
    

此示例展示了 Snowflake Cortex 如何将非结构化数据转换为结构化数据,从而可以对其进行分析和利用。 这极大地扩展了 Snowflake 的应用场景,使其能够处理各种类型的数据,为用户提供更全面的数据洞察。

Snowflake AI App Framework:构建智能应用

除了 AI-SQL Operators,Snowflake 还提供了 Snowflake AI App Framework, 允许开发者在 Snowflake 平台上构建和部署 AI 驱动的应用程序。这个框架提供了一系列的工具和 API,可以简化 AI 应用的开发和部署过程,使开发者能够更快速地构建智能应用,解决各种业务问题。

结论:迎接数据工程的AI未来

Snowflake Cortex AI-SQL Operators 代表了数据工程领域的重大飞跃。 通过将 AI 驱动的功能直接嵌入到 SQL 中,它们能够实现更快、更智能、更可扩展的数据转换流程。 随着 AI 驱动的分析成为主流,拥抱像 Cortex SQL Operators 这样的工具对于现代数据工程师来说至关重要。

凭借其一流的 SQL 引擎,Snowflake 不仅简化了复杂的数据工作负载,还使工程师能够突破 AI 驱动分析的可能性边界。 通过 Snowflake CortexAISQL 结合,数据工程师能够以更高效、更智能的方式处理和分析数据,从而为企业创造更大的价值。Snowflake Cortex AI-SQL,毫无疑问是数据工程的未来。拥抱 Snowflake Cortex AI-SQL,就是拥抱数据工程的未来!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注