在数据工程领域,Snowflake 一直以其卓越的SQL引擎而闻名,其云计算原生架构、自动优化功能和高并发能力使其成为处理复杂数据工作负载的首选。然而,面对日益复杂的AI驱动的分析需求,传统的基于SQL的转换已经显得力不从心。现在,Snowflake Cortex AI-SQL的出现,彻底改变了这一局面,它将AI能力无缝集成到SQL引擎中,为数据工程师开启了全新的可能性。
Snowflake Cortex:AI与SQL的完美融合
Snowflake Cortex 产品线允许用户直接在SQL中运行由大型语言模型 (LLM) 提供支持的查询,从而无需使用外部AI工具。这些功能实现了无缝的文本分析、情感检测、摘要和智能数据处理,无需离开 Snowflake 环境。这使得数据工程师和分析师即使没有机器学习方面的专业知识,也可以利用 AI 驱动的 SQL 函数,降低了 AI 的使用门槛,使其更易于访问、更高效且更具成本效益。通过将 AI 功能直接嵌入到 Snowflake 中,Snowflake Cortex 减少了对外部 AI 服务、API 集成和复杂模型管理的需求,从而降低了运营成本并简化了工作流程。
例如,想象你需要分析大量的客户评论数据,了解用户对某个产品的整体情感倾向。传统的方式可能需要将数据导出到外部的NLP工具,进行分析后再导入回来。而使用 Snowflake Cortex,你可以直接使用SNOWFLAKE.CORTEX.SENTIMENT(review)
函数,在SQL查询中轻松完成情感分析,得到每个评论的情感极性(正面、负面或中性),大幅简化了流程并提升效率。
AI-SQL Operators:数据管道中的强大助手
Snowflake Cortex 引入了新的 AI-SQL Operators,例如 AI_FILTER
和 AI_AGG
(Private Preview),使数据工程师能够直接在其数据管道中利用大型语言模型 (LLM) 类似的功能,以便使用 LLM 类似的传统数据库运算符轻松分析多行。这些操作符为数据处理带来了革命性的变化,以下是一些实际用例:
-
图像过滤:
AI_FILTER
可以过滤掉所有包含可识别面部的图像。例如,假设你有一个包含大量用户上传图片的数据库,出于隐私保护的目的,你需要移除所有包含人脸的图片。使用AI_FILTER
,你可以编写简单的SQL语句实现这一目标,无需手动审查每一张图片。SELECT * FROM image_table WHERE NOT SNOWFLAKE.CORTEX.AI_FILTER('contains an identifiable face', image_data);
-
信息聚合:
AI_AGG
可以总结表中的所有工单中的见解。例如,客户服务团队可能需要了解一段时间内客户提出的主要问题和投诉。AI_AGG
可以从工单记录中提取关键信息,并生成一份简洁明了的摘要报告,帮助团队快速了解客户需求并做出相应改进。SELECT ai_agg(ticket_content, 'Summarize the main issues and customer sentiment.') FROM customer_support_tickets;
-
实体标准化:
AI_AGG
可以对列中的所有实体名称进行规范化。例如,一个包含客户信息的表格可能存在姓名拼写不一致的问题,例如 “John Smith”、”Jon Smith” 和 “J. Smith”。AI_AGG
可以通过识别并统一这些拼写,提高数据质量和准确性。SELECT ai_agg(customer_name, 'Standardize customer names.') FROM customer_table;
-
数据关联:可以将包含图像数据的列与另一个表中的文本列连接起来。 例如,在电商平台中,可以将商品图片与商品描述文本连接起来,为用户提供更全面的商品信息。
构建AI驱动的数据管道:实践案例
文章中给出了一个利用 Snowflake Cortex 构建数据管道的示例,以下是对该示例的详细解读:
-
数据准备:首先,需要创建一个stage,将数据文件上传到该stage,然后创建一个文件格式,定义CSV文件的格式。接着,创建一个目标表
product_reviews_master
,用于存储原始数据。create or replace stage my_stage; -- upload data file into this stage, using UI or SnowCLI ls @my_stage; CREATE FILE FORMAT LOAD_CSV_FF TYPE=CSV SKIP_HEADER=1 FIELD_DELIMITER=',' TRIM_SPACE=TRUE FIELD_OPTIONALLY_ENCLOSED_BY='"' REPLACE_INVALID_CHARACTERS=TRUE DATE_FORMAT=AUTO TIME_FORMAT=AUTO TIMESTAMP_FORMAT=AUTO; -- We will use this table to push data into raw data to create pipeline CREATE OR REPLACE TABLE product_reviews_master( product_id VARCHAR , product_title VARCHAR , rating NUMBER(38, 0) , summary VARCHAR , review VARCHAR , location VARCHAR , date VARCHAR , upvotes NUMBER(38, 0) , downvotes NUMBER(38, 0) ); -- load data into table COPY INTO PRODUCT_REVIEWS_MASTER FROM @my_stage/Flipkart_Reviews.csv FILE_FORMAT = (FORMAT_NAME = 'LOAD_CSV_FF') -- make sure data SELECT COUNT(*) FROM PRODUCT_REVIEWS_MASTER;
-
数据管道构建:创建一个类似
product_reviews_master
的表product_reviews
,并创建一个streamproduct_reviews_stream
,用于跟踪对product_reviews
表的更改。-- data pipeline start here, create table and stream on table CREATE OR REPLACE TABLE product_reviews like product_reviews_master; CREATE OR REPLACE STREAM product_reviews_stream on table product_reviews;
-
数据注入:将数据从
product_reviews_master
表插入到product_reviews
表中。在实际应用中,数据可能来自Snowpipe、Kafka等上游数据源。-- lets ingest some data , -- In real world this data come from upstream sources, -- using snowpipe, kafka etc insert into PRODUCT_REVIEWS select * from product_reviews_master limit 10 offset 200;
-
AI分析:使用
SNOWFLAKE.CORTEX.SENTIMENT
函数进行情感分析,使用SNOWFLAKE.CORTEX.CLASSIFY_TEXT
函数进行文本分类,创建一个新表product_review_analysis
,用于存储分析结果。-- Above insert statment make data available in stream -- use AISQL Function to find sentiment and classification CREATE OR REPLACE TABLE product_review_analysis as SELECT product_id, review, SNOWFLAKE.CORTEX.SENTIMENT(review) AS sentiment, SNOWFLAKE.CORTEX.CLASSIFY_TEXT(review,ARRAY_CONSTRUCT('POSITIVE', 'NEUTRAL', 'NEGATIVE')) AS comment_category from PRODUCT_REVIEWS_STREAM; -- query data for analysis select * from product_review_analysis;
-
自动化:创建一个 Triggered Task,当stream中有新数据到达时,自动执行AI分析,并将结果插入到
product_review_analysis
表中。-- Now create Triggered Task when data arrive in the stream it will do -- transformation using ai powerd functions CREATE OR REPLACE TASK prod_review_analysis_task warehouse = demo_wh USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS = 15 WHEN SYSTEM$STREAM_HAS_DATA('PRODUCT_REVIEWS_STREAM') AS INSERT INTO product_review_analysis(product_id, review, sentiment, comment_category) SELECT product_id, review, SNOWFLAKE.CORTEX.SENTIMENT(review) AS sentiment, SNOWFLAKE.CORTEX.CLASSIFY_TEXT(review,ARRAY_CONSTRUCT('POSITIVE', 'NEUTRAL', 'NEGATIVE')) AS comment_category FROM PRODUCT_REVIEWS_STREAM; ALTER task prod_review_analysis_task resume; -- test it out by ingesting more data into raw table insert into PRODUCT_REVIEWS select * from product_reviews_master limit 10 offset 400; -- in less than 15 second new data got transformed. select * from product_review_analysis; -- your data pipline is ready now
AI驱动的分析:营销团队的利器
在数据被转换后,下一步是执行 AI 驱动的分析。例如,营销团队可能希望向对产品提供负面反馈的用户提供促销活动。 使用 AI_FILTER
函数,你还可以推断缺失的信息,例如在只有城市名称可用时识别州/省。
SELECT * FROM PRODUCT_REVIEWS
WHERE
SNOWFLAKE.CORTEX.AI_FILTER('The reviewer did not like product', review);
-- or person leaving in specific area
SELECT * FROM PRODUCT_REVIEWS
WHERE
SNOWFLAKE.CORTEX.AI_FILTER('Is in Maharastra', location);
-- please note column do not have state Maharastra - but llm find it those city in Maharastra
此示例展示了如何使用 AI_FILTER
找出对产品不满意的用户,甚至可以根据城市名称推断用户的所在州/省,即使原始数据中没有提供州/省信息。
接下来,可以使用 AI_AGG
运算符获取基于位置的一些反馈的聚合反馈:
SELECT
ai_agg(review, 'Summarize feedback for product and suggest quotes from those.')
FROM PRODUCT_REVIEWS
WHERE
SNOWFLAKE.CORTEX.AI_FILTER('The reviewer did not like it', review)
AND SNOWFLAKE.CORTEX.AI_FILTER('Location in Maharastra', location);
此示例展示了如何结合 AI_FILTER
和 AI_AGG
,找出对产品不满意的位于特定位置(例如马哈拉施特拉邦)的用户,并生成反馈摘要,以及用户评价的引用,帮助营销团队更好地了解用户需求并制定个性化的营销策略。
非结构化数据处理:图像到数据的转换
Snowflake Cortex 还可以用于处理非结构化数据,例如图像。文章中展示了如何将图表转换为数据:
-
创建Stage:首先,创建一个stage,用于存储非结构化数据。
-- stage with serverside encryption CREATE STAGE unstructured_data ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE'); -- upload above image into this stage
-
创建Table:创建一个table,用于存储从图表中解析出的数据。
--create table to store data from charts create or replace table chart_data(out_data variant);
-
数据解析:使用 Snowflake Cortex 的 Claude 模型解析图像,将图表转换为 JSON 文件,并将数据插入到表中。
-- Parse data using cortex cluade model insert into chart_data SELECT TO_VARIANT (PARSE_JSON (SNOWFLAKE.CORTEX.COMPLETE ('claude-3-5-sonnet','Turn this pie chart into a JSON file containing two attributes for each pie: spending and percentage. Output should contain a valid JSON file only, no other texts.',TO_FILE ('@unstructured_data', 'usbudgetspending.png' ))));
-
数据查询:从表中查询解析出的数据,进行分析。
SELECT chart.value:spending::string AS spending, chart.value: percentage::numeric(4,2) AS PERCENTAGE FROM chart_data AS c,LATERAL FLATTEN (INPUT => c.out_data, PATH=> 'spending_data') AS chart ORDER BY PERCENTAGE desc;
此示例展示了 Snowflake Cortex 如何将非结构化数据转换为结构化数据,从而可以对其进行分析和利用。 这极大地扩展了 Snowflake 的应用场景,使其能够处理各种类型的数据,为用户提供更全面的数据洞察。
Snowflake AI App Framework:构建智能应用
除了 AI-SQL Operators,Snowflake 还提供了 Snowflake AI App Framework, 允许开发者在 Snowflake 平台上构建和部署 AI 驱动的应用程序。这个框架提供了一系列的工具和 API,可以简化 AI 应用的开发和部署过程,使开发者能够更快速地构建智能应用,解决各种业务问题。
结论:迎接数据工程的AI未来
Snowflake Cortex AI-SQL Operators 代表了数据工程领域的重大飞跃。 通过将 AI 驱动的功能直接嵌入到 SQL 中,它们能够实现更快、更智能、更可扩展的数据转换流程。 随着 AI 驱动的分析成为主流,拥抱像 Cortex SQL Operators 这样的工具对于现代数据工程师来说至关重要。
凭借其一流的 SQL 引擎,Snowflake 不仅简化了复杂的数据工作负载,还使工程师能够突破 AI 驱动分析的可能性边界。 通过 Snowflake Cortex 将 AI 与 SQL 结合,数据工程师能够以更高效、更智能的方式处理和分析数据,从而为企业创造更大的价值。Snowflake Cortex AI-SQL,毫无疑问是数据工程的未来。拥抱 Snowflake Cortex AI-SQL,就是拥抱数据工程的未来!