« Hadoop的集群拓扑结构 | 首页 | Hadoop赢得1TB排序基准评估第一名 »

pig语言

作者:马士华 发表于:2008-07-02 14:00 最后更新于:2008-07-02 14:01
版权声明:可以任意转载,转载时请务必以超链接形式标明文章原始出处和作者信息。
http://www.hadoop.org.cn/hadoop/pig-language/

Pig是yahoo捐献给apache的一个项目,目前还在Apache孵化器中,但是基本功能已经可用了.今天我就给大家介绍这个好用的Pig.Pig是SQL-like语言,是在MapReduce上构建的一种高级查询语言,把一些运算编译进MapReduce模型的Map和Reduce中,并且用户可以定义自己的功能.Yahoo网格运算部门开发的又一个克隆Google的项目:Sawzall.

支持的运算
算数符号:+,-,*,/
多种数据类型:string,int,float,long等
比较运算:==, !=, >, >=, <, <=,eq, neq, gt, gte, lt,lte,matches
复杂的数据类型:bag,tuple,map
相关运算:FILTER,GROUP BY,ORDER,DISTINCT,UNION,JOIN,FOREACH ... GENERATE
数据统计:COUNT,SUM,AVG,MIN,MAX等。

pig支持的原始类型数据:int,long,float,double,char数组,byte数组

pig内部的数据类型:
bag:一个tuple的集合,表示方式:{<1,2>,<3,4>}
tuple:一个排序的数组,表示方式:<pig,3.14>
map:一个key,value的map数据,表示方式:['pig':<'load','store'>,'web':'hadoop.apache.org']
atom:单一的原始类型数据,作为string存储,也能转化成数值类型.表示方式:'apache.org'或'2.3'.

数据表示方式:

t = < 1, {<2,3>,<4,6>,<5,7>}, ['apache':'hadoop']>

在上面的例子中一个tuple被引用到t下,因此t有3个域f1,f2,f3,我们可以通过t.f1或t.$0访问到1,通过t.f2或t.$1访问到{<2,3>,<4,6>,<5,7>},通过t.f3或t.$2访问到['apache':'hadoop'].

pig可以以local的方式或cluster的方式运行.下面我们还是以处理apache的日志文件开始我们的pig脚本例子来解释pig语言.我们的日志(access.log)包含许多日的访问日志,我们需要知道在2007年1月30日每一个ip每小时访问页面多少次.在运行程序以前请保证你运行在java 1.5以上版本,并下载例子文件.

local方式(仅支持非windows系统):
请把hadoop-site.xml文件删除.运行:

java -cp .:pig.jar org.apache.pig.Main -x local log.pig

cluster的方式(支持windows系统):
保证你的Hadoop的集群版本是0.17.0,修改hadoop-site.xml中的fs.default.name,mapred.job.tracker,mapred.system.dir中的值,使这些值和cluster相同.

java -cp .:pig.jar org.apache.pig.Main log.pig

查看结果:

cat logs/20070130;

脚本解释:

使用Hadoop的copyFromLocal命令拷贝access.log到HDFS

copyFromLocal access.log access.log;

注册包含用户定义功能(UDFs)的jar文件

REGISTER udfs.jar;

设定mapreduce工作名称

set job.name 'hadoop.org.cn log parser';

使用用户定义功能装载log文件

in = LOAD 'access.log' USING  cn.org.hadoop.pig.storage.LogStorage();

因为在NSCA log格式中日期格式是"21/Jan/2007:15:29:24 +0800",所以转换成20070121152924格式

gen = FOREACH in GENERATE $0,cn.org.hadoop.pig.time.FormatTime($1),*;

过滤掉非正常的行。

result = FILTER gen BY (NOT IsEmpty($1));

存储结果到HDFS用户的temp目录中

STORE result INTO 'temp';

重新设定mapreduce工作名称

set job.name 'hadoop.org.cn filter parser';

使用默认的功能(PigStorage)装载temp目录中的文件

A = LOAD 'temp' AS (ip,date,method,url,protocol,code,bytes);

提取出日期为2007-01-30日的结果集

B = FILTER A BY (date MATCHES '20070130.*');

因为我们仅仅关心每个小时的结果,所以我们调用用户定义功能ExtractTime,提取一天中的小时

C = FOREACH B GENERATE ip,cn.org.hadoop.pig.time.ExtractTime(date,'8','10') as hour;

使用GROUP功能

D = GROUP C BY (ip,hour);

计算每一个ip每小时访问页面多少次

E = FOREACH D GENERATE flatten($0),COUNT($1);

按小时降序排列

F = ORDER E BY $1 USING cn.org.hadoop.pig.sort.Desc;

存储结果到目录

STORE F INTO 'logs/20070130';

在压缩包的udfs.jar文件中包含源码,压缩包还包含vm下Pig语言高亮的pig.vim代码
pig.vim安装方法:
1.拷贝 pig.vim到~/.vim/syntax/目录下
2. 编辑~/.vimrc 添加下面的行:
augroup filetypedetect
au BufNewFile,BufRead *.pig set filetype=pig syntax=pig
augroup END

Pig语言更详细的介绍,请大家访问Pig Wiki


相关文章

引用通告

如果您想引用这篇文章到您的Blog,
请复制下面的链接,并放置到您发表文章的相应界面中。
http://www.hadoop.org.cn/hadoop/pig-language/trackback/

Comments

2 Comments to “pig语言”

  1. qiaohui.zhang on 2008-07-18 11:29 am
    Gravatar

    我最近也看了下pig这东东,发现好用也不好用,我跑pig在hadoop集群下面的话,单个日志文件或者文件夹传到hdfs下面运行没什么大的问题,就是测试分析打包日志文件老出问题,如下,请帮忙看看:
    -desktop:~/opt/pig/bin$ java -cp pig.jar:/home/quan/soft/hadoop/conf org.apache.pig.Main log.pig
    2008-07-18 11:14:36,219 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: localhost:9000
    2008-07-18 11:14:36,289 [main] WARN org.apache.hadoop.fs.FileSystem - “localhost:9000″ is a deprecated filesystem name. Use “hdfs://localhost:9000/” instead.
    2008-07-18 11:14:36,555 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: localhost:9001
    2008-07-18 11:14:36,701 [main] WARN org.apache.hadoop.fs.FileSystem - “localhost:9000″ is a deprecated filesystem name. Use “hdfs://localhost:9000/” instead.
    2008-07-18 11:14:37,276 [main] INFO org.apache.pig.backend.hadoop.executionengine.POMapreduce - —– MapReduce Job —–
    2008-07-18 11:14:37,276 [main] INFO org.apache.pig.backend.hadoop.executionengine.POMapreduce - Input: [TEST.tar.gz:PigStorage('|')]
    2008-07-18 11:14:37,276 [main] INFO org.apache.pig.backend.hadoop.executionengine.POMapreduce - Map: [[*]->GENERATE {[PROJECT $1],[PROJECT $2]}]
    2008-07-18 11:14:37,276 [main] INFO org.apache.pig.backend.hadoop.executionengine.POMapreduce - Group: [GENERATE {[PROJECT $0],[*]}]
    2008-07-18 11:14:37,276 [main] INFO org.apache.pig.backend.hadoop.executionengine.POMapreduce - Combine: GENERATE {[PROJECT $0],[org.apache.pig.builtin.COUNT$Initial(GENERATE {[PROJECT $1]})]}
    2008-07-18 11:14:37,277 [main] INFO org.apache.pig.backend.hadoop.executionengine.POMapreduce - Reduce: GENERATE {[FLATTEN PROJECT $0],[org.apache.pig.builtin.COUNT$Final(GENERATE {[PROJECT $1]->[PROJECT $1]})]}
    2008-07-18 11:14:37,277 [main] INFO org.apache.pig.backend.hadoop.executionengine.POMapreduce - Output: 999:PigStorage()
    2008-07-18 11:14:37,277 [main] INFO org.apache.pig.backend.hadoop.executionengine.POMapreduce - Split: null
    2008-07-18 11:14:37,277 [main] INFO org.apache.pig.backend.hadoop.executionengine.POMapreduce - Map parallelism: -1
    2008-07-18 11:14:37,277 [main] INFO org.apache.pig.backend.hadoop.executionengine.POMapreduce - Reduce parallelism: -1
    2008-07-18 11:14:38,447 [main] WARN org.apache.hadoop.fs.FileSystem - “localhost:9000″ is a deprecated filesystem name. Use “hdfs://localhost:9000/” instead.
    2008-07-18 11:14:38,484 [main] WARN org.apache.hadoop.fs.FileSystem - “localhost:9000″ is a deprecated filesystem name. Use “hdfs://localhost:9000/” instead.
    2008-07-18 11:14:38,579 [main] WARN org.apache.hadoop.fs.FileSystem - “localhost:9000″ is a deprecated filesystem name. Use “hdfs://localhost:9000/” instead.
    2008-07-18 11:14:38,614 [main] WARN org.apache.hadoop.fs.FileSystem - “localhost:9000″ is a deprecated filesystem name. Use “hdfs://localhost:9000/” instead.
    2008-07-18 11:14:40,311 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher - Pig progress = 0%
    2008-07-18 11:15:12,570 [main] ERROR org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher - Error message from task (map) tip_200807181012_0003_m_000000 java.lang.IndexOutOfBoundsException: Requested index 2 from tuple ()
    at org.apache.pig.data.Tuple.getField(Tuple.java:176)
    at org.apache.pig.impl.eval.ProjectSpec.eval(ProjectSpec.java:84)
    at org.apache.pig.impl.eval.SimpleEvalSpec$1.add(SimpleEvalSpec.java:38)
    at org.apache.pig.impl.eval.GenerateSpec$CrossProductItem.(GenerateSpec.java:159)
    at org.apache.pig.impl.eval.GenerateSpec$1.add(GenerateSpec.java:79)
    at org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce.run(PigMapReduce.java:117)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:219)
    at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2124)
    java.lang.IndexOutOfBoundsException: Requested index 2 from tuple ()
    at org.apache.pig.data.Tuple.getField(Tuple.java:176)
    at org.apache.pig.impl.eval.ProjectSpec.eval(ProjectSpec.java:84)
    at org.apache.pig.impl.eval.SimpleEvalSpec$1.add(SimpleEvalSpec.java:38)
    at org.apache.pig.impl.eval.GenerateSpec$CrossProductItem.(GenerateSpec.java:159)
    at org.apache.pig.impl.eval.GenerateSpec$1.add(GenerateSpec.java:79)
    at org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce.run(PigMapReduce.java:117)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:219)
    at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2124)
    java.lang.IndexOutOfBoundsException: Requested index 2 from tuple ()
    at org.apache.pig.data.Tuple.getField(Tuple.java:176)
    at org.apache.pig.impl.eval.ProjectSpec.eval(ProjectSpec.java:84)
    at org.apache.pig.impl.eval.SimpleEvalSpec$1.add(SimpleEvalSpec.java:38)
    at org.apache.pig.impl.eval.GenerateSpec$CrossProductItem.(GenerateSpec.java:159)
    at org.apache.pig.impl.eval.GenerateSpec$1.add(GenerateSpec.java:79)
    at org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce.run(PigMapReduce.java:117)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:219)
    at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2124)
    java.lang.IndexOutOfBoundsException: Requested index 2 from tuple ()
    at org.apache.pig.data.Tuple.getField(Tuple.java:176)
    at org.apache.pig.impl.eval.ProjectSpec.eval(ProjectSpec.java:84)
    at org.apache.pig.impl.eval.SimpleEvalSpec$1.add(SimpleEvalSpec.java:38)
    at org.apache.pig.impl.eval.GenerateSpec$CrossProductItem.(GenerateSpec.java:159)
    at org.apache.pig.impl.eval.GenerateSpec$1.add(GenerateSpec.java:79)
    at org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce.run(PigMapReduce.java:117)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:219)
    at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2124)

    2008-07-18 11:15:12,589 [main] ERROR org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher - Error message from task (reduce) tip_200807181012_0003_r_000000
    2008-07-18 11:15:12,592 [main] ERROR org.apache.pig.tools.grunt.Grunt - java.io.IOException: Unable to store alias null
    at org.apache.pig.impl.util.WrappedIOException.wrap(WrappedIOException.java:16)
    at org.apache.pig.PigServer.registerQuery(PigServer.java:296)
    at org.apache.pig.tools.grunt.GruntParser.processPig(GruntParser.java:457)
    at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:233)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:63)
    at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:60)
    at org.apache.pig.Main.main(Main.java:294)
    Caused by: org.apache.pig.backend.executionengine.ExecException: java.io.IOException: Job failed
    at org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.execute(HExecutionEngine.java:291)
    at org.apache.pig.PigServer.optimizeAndRunQuery(PigServer.java:413)
    at org.apache.pig.PigServer.registerQuery(PigServer.java:293)
    … 5 more
    Caused by: java.io.IOException: Job failed
    at org.apache.pig.backend.hadoop.executionengine.POMapreduce.open(POMapreduce.java:188)
    at org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.execute(HExecutionEngine.java:277)
    … 7 more

    2008-07-18 11:15:12,592 [main] ERROR org.apache.pig.tools.grunt.Grunt - Unable to store alias null

    不知道是不是需要在hadoop-site文件中配置压缩解压缩的相关指令,hadoop中有无相关说明?请指点

  2. admin on 2008-07-18 2:33 pm
    Gravatar

    pig是支持gz压缩的,*.gz的文件是用GZipInputStream解压读取的具体请看org.apache.pig.backend.executionengine.PigSclice,但是现在不支持分块读取,即不生成多个InputSplit。以后的版本将支持生成多个InputSplit支持。http://issues.apache.org/jira/browse/PIG-42

    看了你的input文件是TEST.tar.gz。
    我估计你用了tar -xzf 命令。这也是说这就是tar在打包的同时调用gzip的压缩程序。用GZipInputStream解压后那是一个tar文件,而不是你实际的文本文件。所以你的Tuple会产生错误。你可用gzip或bzip2打包你的文件

Leave a Reply