« Hadoop的集群拓扑结构 | 首页 | Hadoop赢得1TB排序基准评估第一名 »
pig语言
作者:马士华 发表于:2008-07-02 14:00 最后更新于:2008-07-02 14:01版权声明:可以任意转载,转载时请务必以超链接形式标明文章原始出处和作者信息。
http://www.hadoop.org.cn/hadoop/pig-language/
Pig是yahoo捐献给apache的一个项目,目前还在Apache孵化器中,但是基本功能已经可用了.今天我就给大家介绍这个好用的Pig.Pig是SQL-like语言,是在MapReduce上构建的一种高级查询语言,把一些运算编译进MapReduce模型的Map和Reduce中,并且用户可以定义自己的功能.Yahoo网格运算部门开发的又一个克隆Google的项目:Sawzall.
支持的运算
算数符号:+,-,*,/
多种数据类型:string,int,float,long等
比较运算:==, !=, >, >=, <, <=,eq, neq, gt, gte, lt,lte,matches
复杂的数据类型:bag,tuple,map
相关运算:FILTER,GROUP BY,ORDER,DISTINCT,UNION,JOIN,FOREACH ... GENERATE
数据统计:COUNT,SUM,AVG,MIN,MAX等。
pig支持的原始类型数据:int,long,float,double,char数组,byte数组
pig内部的数据类型:
bag:一个tuple的集合,表示方式:{<1,2>,<3,4>}
tuple:一个排序的数组,表示方式:<pig,3.14>
map:一个key,value的map数据,表示方式:['pig':<'load','store'>,'web':'hadoop.apache.org']
atom:单一的原始类型数据,作为string存储,也能转化成数值类型.表示方式:'apache.org'或'2.3'.
数据表示方式:
t = < 1, {<2,3>,<4,6>,<5,7>}, ['apache':'hadoop']>
在上面的例子中一个tuple被引用到t下,因此t有3个域f1,f2,f3,我们可以通过t.f1或t.$0访问到1,通过t.f2或t.$1访问到{<2,3>,<4,6>,<5,7>},通过t.f3或t.$2访问到['apache':'hadoop'].
pig可以以local的方式或cluster的方式运行.下面我们还是以处理apache的日志文件开始我们的pig脚本例子来解释pig语言.我们的日志(access.log)包含许多日的访问日志,我们需要知道在2007年1月30日每一个ip每小时访问页面多少次.在运行程序以前请保证你运行在java 1.5以上版本,并下载例子文件.
local方式(仅支持非windows系统):
请把hadoop-site.xml文件删除.运行:
java -cp .:pig.jar org.apache.pig.Main -x local log.pig
cluster的方式(支持windows系统):
保证你的Hadoop的集群版本是0.17.0,修改hadoop-site.xml中的fs.default.name,mapred.job.tracker,mapred.system.dir中的值,使这些值和cluster相同.
java -cp .:pig.jar org.apache.pig.Main log.pig
查看结果:
cat logs/20070130;
脚本解释:
使用Hadoop的copyFromLocal命令拷贝access.log到HDFS
copyFromLocal access.log access.log;
注册包含用户定义功能(UDFs)的jar文件
REGISTER udfs.jar;
设定mapreduce工作名称
set job.name 'hadoop.org.cn log parser';
使用用户定义功能装载log文件
in = LOAD 'access.log' USING cn.org.hadoop.pig.storage.LogStorage();
因为在NSCA log格式中日期格式是"21/Jan/2007:15:29:24 +0800",所以转换成20070121152924格式
gen = FOREACH in GENERATE $0,cn.org.hadoop.pig.time.FormatTime($1),*;
过滤掉非正常的行。
result = FILTER gen BY (NOT IsEmpty($1));
存储结果到HDFS用户的temp目录中
STORE result INTO 'temp';
重新设定mapreduce工作名称
set job.name 'hadoop.org.cn filter parser';
使用默认的功能(PigStorage)装载temp目录中的文件
A = LOAD 'temp' AS (ip,date,method,url,protocol,code,bytes);
提取出日期为2007-01-30日的结果集
B = FILTER A BY (date MATCHES '20070130.*');
因为我们仅仅关心每个小时的结果,所以我们调用用户定义功能ExtractTime,提取一天中的小时
C = FOREACH B GENERATE ip,cn.org.hadoop.pig.time.ExtractTime(date,'8','10') as hour;
使用GROUP功能
D = GROUP C BY (ip,hour);
计算每一个ip每小时访问页面多少次
E = FOREACH D GENERATE flatten($0),COUNT($1);
按小时降序排列
F = ORDER E BY $1 USING cn.org.hadoop.pig.sort.Desc;
存储结果到目录
STORE F INTO 'logs/20070130';
在压缩包的udfs.jar文件中包含源码,压缩包还包含vm下Pig语言高亮的pig.vim代码
pig.vim安装方法:
1.拷贝 pig.vim到~/.vim/syntax/目录下
2. 编辑~/.vimrc 添加下面的行:
augroup filetypedetect
au BufNewFile,BufRead *.pig set filetype=pig syntax=pig
augroup END
Pig语言更详细的介绍,请大家访问Pig Wiki
相关文章
引用通告
如果您想引用这篇文章到您的Blog,
请复制下面的链接,并放置到您发表文章的相应界面中。
http://www.hadoop.org.cn/hadoop/pig-language/trackback/
Comments
2 Comments to “pig语言”
Leave a Reply
我最近也看了下pig这东东,发现好用也不好用,我跑pig在hadoop集群下面的话,单个日志文件或者文件夹传到hdfs下面运行没什么大的问题,就是测试分析打包日志文件老出问题,如下,请帮忙看看:
-desktop:~/opt/pig/bin$ java -cp pig.jar:/home/quan/soft/hadoop/conf org.apache.pig.Main log.pig
2008-07-18 11:14:36,219 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: localhost:9000
2008-07-18 11:14:36,289 [main] WARN org.apache.hadoop.fs.FileSystem - “localhost:9000″ is a deprecated filesystem name. Use “hdfs://localhost:9000/” instead.
2008-07-18 11:14:36,555 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: localhost:9001
2008-07-18 11:14:36,701 [main] WARN org.apache.hadoop.fs.FileSystem - “localhost:9000″ is a deprecated filesystem name. Use “hdfs://localhost:9000/” instead.
2008-07-18 11:14:37,276 [main] INFO org.apache.pig.backend.hadoop.executionengine.POMapreduce - —– MapReduce Job —–
2008-07-18 11:14:37,276 [main] INFO org.apache.pig.backend.hadoop.executionengine.POMapreduce - Input: [TEST.tar.gz:PigStorage('|')]
2008-07-18 11:14:37,276 [main] INFO org.apache.pig.backend.hadoop.executionengine.POMapreduce - Map: [[*]->GENERATE {[PROJECT $1],[PROJECT $2]}]
2008-07-18 11:14:37,276 [main] INFO org.apache.pig.backend.hadoop.executionengine.POMapreduce - Group: [GENERATE {[PROJECT $0],[*]}]
2008-07-18 11:14:37,276 [main] INFO org.apache.pig.backend.hadoop.executionengine.POMapreduce - Combine: GENERATE {[PROJECT $0],[org.apache.pig.builtin.COUNT$Initial(GENERATE {[PROJECT $1]})]}
2008-07-18 11:14:37,277 [main] INFO org.apache.pig.backend.hadoop.executionengine.POMapreduce - Reduce: GENERATE {[FLATTEN PROJECT $0],[org.apache.pig.builtin.COUNT$Final(GENERATE {[PROJECT $1]->[PROJECT $1]})]}
2008-07-18 11:14:37,277 [main] INFO org.apache.pig.backend.hadoop.executionengine.POMapreduce - Output: 999:PigStorage()
2008-07-18 11:14:37,277 [main] INFO org.apache.pig.backend.hadoop.executionengine.POMapreduce - Split: null
2008-07-18 11:14:37,277 [main] INFO org.apache.pig.backend.hadoop.executionengine.POMapreduce - Map parallelism: -1
2008-07-18 11:14:37,277 [main] INFO org.apache.pig.backend.hadoop.executionengine.POMapreduce - Reduce parallelism: -1
2008-07-18 11:14:38,447 [main] WARN org.apache.hadoop.fs.FileSystem - “localhost:9000″ is a deprecated filesystem name. Use “hdfs://localhost:9000/” instead.
2008-07-18 11:14:38,484 [main] WARN org.apache.hadoop.fs.FileSystem - “localhost:9000″ is a deprecated filesystem name. Use “hdfs://localhost:9000/” instead.
2008-07-18 11:14:38,579 [main] WARN org.apache.hadoop.fs.FileSystem - “localhost:9000″ is a deprecated filesystem name. Use “hdfs://localhost:9000/” instead.
2008-07-18 11:14:38,614 [main] WARN org.apache.hadoop.fs.FileSystem - “localhost:9000″ is a deprecated filesystem name. Use “hdfs://localhost:9000/” instead.
2008-07-18 11:14:40,311 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher - Pig progress = 0%
2008-07-18 11:15:12,570 [main] ERROR org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher - Error message from task (map) tip_200807181012_0003_m_000000 java.lang.IndexOutOfBoundsException: Requested index 2 from tuple ()
at org.apache.pig.data.Tuple.getField(Tuple.java:176)
at org.apache.pig.impl.eval.ProjectSpec.eval(ProjectSpec.java:84)
at org.apache.pig.impl.eval.SimpleEvalSpec$1.add(SimpleEvalSpec.java:38)
at org.apache.pig.impl.eval.GenerateSpec$CrossProductItem.(GenerateSpec.java:159)
at org.apache.pig.impl.eval.GenerateSpec$1.add(GenerateSpec.java:79)
at org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce.run(PigMapReduce.java:117)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:219)
at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2124)
java.lang.IndexOutOfBoundsException: Requested index 2 from tuple ()
at org.apache.pig.data.Tuple.getField(Tuple.java:176)
at org.apache.pig.impl.eval.ProjectSpec.eval(ProjectSpec.java:84)
at org.apache.pig.impl.eval.SimpleEvalSpec$1.add(SimpleEvalSpec.java:38)
at org.apache.pig.impl.eval.GenerateSpec$CrossProductItem.(GenerateSpec.java:159)
at org.apache.pig.impl.eval.GenerateSpec$1.add(GenerateSpec.java:79)
at org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce.run(PigMapReduce.java:117)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:219)
at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2124)
java.lang.IndexOutOfBoundsException: Requested index 2 from tuple ()
at org.apache.pig.data.Tuple.getField(Tuple.java:176)
at org.apache.pig.impl.eval.ProjectSpec.eval(ProjectSpec.java:84)
at org.apache.pig.impl.eval.SimpleEvalSpec$1.add(SimpleEvalSpec.java:38)
at org.apache.pig.impl.eval.GenerateSpec$CrossProductItem.(GenerateSpec.java:159)
at org.apache.pig.impl.eval.GenerateSpec$1.add(GenerateSpec.java:79)
at org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce.run(PigMapReduce.java:117)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:219)
at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2124)
java.lang.IndexOutOfBoundsException: Requested index 2 from tuple ()
at org.apache.pig.data.Tuple.getField(Tuple.java:176)
at org.apache.pig.impl.eval.ProjectSpec.eval(ProjectSpec.java:84)
at org.apache.pig.impl.eval.SimpleEvalSpec$1.add(SimpleEvalSpec.java:38)
at org.apache.pig.impl.eval.GenerateSpec$CrossProductItem.(GenerateSpec.java:159)
at org.apache.pig.impl.eval.GenerateSpec$1.add(GenerateSpec.java:79)
at org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce.run(PigMapReduce.java:117)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:219)
at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2124)
2008-07-18 11:15:12,589 [main] ERROR org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher - Error message from task (reduce) tip_200807181012_0003_r_000000
2008-07-18 11:15:12,592 [main] ERROR org.apache.pig.tools.grunt.Grunt - java.io.IOException: Unable to store alias null
at org.apache.pig.impl.util.WrappedIOException.wrap(WrappedIOException.java:16)
at org.apache.pig.PigServer.registerQuery(PigServer.java:296)
at org.apache.pig.tools.grunt.GruntParser.processPig(GruntParser.java:457)
at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:233)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:63)
at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:60)
at org.apache.pig.Main.main(Main.java:294)
Caused by: org.apache.pig.backend.executionengine.ExecException: java.io.IOException: Job failed
at org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.execute(HExecutionEngine.java:291)
at org.apache.pig.PigServer.optimizeAndRunQuery(PigServer.java:413)
at org.apache.pig.PigServer.registerQuery(PigServer.java:293)
… 5 more
Caused by: java.io.IOException: Job failed
at org.apache.pig.backend.hadoop.executionengine.POMapreduce.open(POMapreduce.java:188)
at org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.execute(HExecutionEngine.java:277)
… 7 more
2008-07-18 11:15:12,592 [main] ERROR org.apache.pig.tools.grunt.Grunt - Unable to store alias null
不知道是不是需要在hadoop-site文件中配置压缩解压缩的相关指令,hadoop中有无相关说明?请指点
pig是支持gz压缩的,*.gz的文件是用GZipInputStream解压读取的具体请看org.apache.pig.backend.executionengine.PigSclice,但是现在不支持分块读取,即不生成多个InputSplit。以后的版本将支持生成多个InputSplit支持。http://issues.apache.org/jira/browse/PIG-42
看了你的input文件是TEST.tar.gz。
我估计你用了tar -xzf 命令。这也是说这就是tar在打包的同时调用gzip的压缩程序。用GZipInputStream解压后那是一个tar文件,而不是你实际的文本文件。所以你的Tuple会产生错误。你可用gzip或bzip2打包你的文件