七度黑光 http://zhwen.org 完美之道,不在无可增加,而在无可删减。 Thu, 16 Mar 2017 03:53:38 +0000 en-US hourly 1 http://wordpress.org/?v=4.3.1 ElasticSearch5.2.2安装坑记录 http://zhwen.org/?p=970 http://zhwen.org/?p=970#comments Thu, 16 Mar 2017 03:53:38 +0000 http://zhwen.org/?p=970

Related posts:

  1. 使用Spark分析网站日志
]]>
部署完成ES之后,如果我们不修改配置,那么默认只有本机可以访问ES的api接口,如果需要给外部机器访问,那么就需要修改ES的配置了。

在默认情况下,ES只允许本地访问api接口,如果我们希望在另外一台机器上访问ES的接口的话,需要配置主机地址:
/data/elasticsearch-5.2.2> vim config/elasticsearch.yml

#network.host: 192.168.0.1
network.host: 10.140.7.12

保存退出,重新启动es,一般都会报错,无法启动

[2017-03-16T10:51:23,168][INFO ][o.e.t.TransportService ] [DwX_4EG] publish_address {10.140.7.12:9300}, bound_addresses {10.140.7.12:9300}
[2017-03-16T10:51:23,176][INFO ][o.e.b.BootstrapChecks ] [DwX_4EG] bound or publishing to a non-loopback or non-link-local address, enforcing bootstrap checks
ERROR: bootstrap checks failed
max virtual memory areas vm.max_map_count [65536] is too low, increase to at least [262144]

对于这个错误,需要这样处理,执行下面的命令,或者把这个配置/etc/sysctl.conf
sysctl -w vm.max_map_count=262144

再次启动,报下面的错误:
system call filters failed to install; check the logs and fix your configuration or disable system call filters at your own risk
原因:
这是在因为操作系统不支持SecComp,而ES5.2.2默认bootstrap.system_call_filter为true进行检测,所以导致检测失败,失败后直接导致ES不能启动。
解决:
在elasticsearch.yml中配置bootstrap.system_call_filter为false,注意要在Memory下面:
bootstrap.memory_lock: false
bootstrap.system_call_filter: false

重启ok
一下是网友遇到的问题,也一并记录一下:
问题一:警告提示
[2016-12-20T22:37:28,543][INFO ][o.e.b.BootstrapCheck ] [elk-node1] bound or publishing to a non-loopback or non-link-local address, enforcing bootstrap checks
[2016-12-20T22:37:28,552][ERROR][o.e.b.Bootstrap ] [elk-node1] node validation exception
bootstrap checks failed
max number of threads [1024] for user [elasticsearch] is too low, increase to at least [2048]
[2016-12-20T22:37:28,560][INFO ][o.e.n.Node ] [elk-node1] stopping …
[2016-12-20T22:37:28,628][INFO ][o.e.n.Node ] [elk-node1] stopped
[2016-12-20T22:37:28,629][INFO ][o.e.n.Node ] [elk-node1] closing …
[2016-12-20T22:37:28,677][INFO ][o.e.n.Node ] [elk-node1] closed

报了一大串错误,其实只是一个警告。

解决:使用心得linux版本,就不会出现此类问题了。

问题二:ERROR: bootstrap checks failed
max file descriptors [4096] for elasticsearch process likely too low, increase to at least [65536]
max number of threads [1024] for user [lishang] likely too low, increase to at least [2048]
解决:切换到root用户,编辑limits.conf 添加类似如下内容
vi /etc/security/limits.conf

添加如下内容:

* soft nofile 65536
* hard nofile 131072
* soft nproc 2048
* hard nproc 4096

问题三:max number of threads [1024] for user [lish] likely too low, increase to at least [2048]

解决:切换到root用户,进入limits.d目录下修改配置文件。
vi /etc/security/limits.d/90-nproc.conf
修改如下内容:

* soft nproc 1024
#修改为
* soft nproc 2048

问题四:max virtual memory areas vm.max_map_count [65530] likely too low, increase to at least [262144]

解决:切换到root用户修改配置sysctl.conf

vi /etc/sysctl.conf
添加下面配置:
vm.max_map_count=655360
并执行命令:
sysctl -p

]]>
http://zhwen.org/?feed=rss2&p=970 0
为游戏分析设计的分布式数据存储系统 http://zhwen.org/?p=967 http://zhwen.org/?p=967#comments Tue, 27 Dec 2016 04:07:59 +0000 http://zhwen.org/?p=967 中国云计算大会中的分享内容,主要是介绍腾讯游戏数据分析平台的一些后台架构设计和游戏分析思路。 在“大数据”相对泛滥的今天,我们看到很多讨论各种大数据架构、存储、工具、算法等等。但是大数据工具在具体应用场景中的计算各有不同之处,那在游戏数据分析中我们腾讯是怎么做的呢?本话题将简单介绍腾讯游戏数据分析系统的后台架构,并且主要介绍一个为游戏分析这类场景设计开发的的小型数据存储系统。 ]]> 本文是今年5月在中国云计算大会中的分享内容,主要是介绍腾讯游戏数据分析平台的一些后台架构设计和游戏分析思路。

在“大数据”相对泛滥的今天,我们看到很多讨论各种大数据架构、存储、工具、算法等等。但是大数据工具在具体应用场景中的计算各有不同之处,那在游戏数据分析中我们腾讯是怎么做的呢?本话题将简单介绍腾讯游戏数据分析系统的后台架构,并且主要介绍一个为游戏分析这类场景设计开发的的小型数据存储系统。

]]>
http://zhwen.org/?feed=rss2&p=967 0
使用Spark分析网站日志 http://zhwen.org/?p=964 http://zhwen.org/?p=964#comments Thu, 29 Sep 2016 08:28:01 +0000 http://zhwen.org/?p=964 tt 在sparkshell中执行下面的代码: val line = sc.textFile("/data1/data/t1") line.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).map(e => (e._2, e._1)).reduceByKey(_+","+_).sortByKey(true,1).saveAsTextFile("/data1/data/t3") 2)最后的结果t3的内容如下,发现这几个ip的访问量非常大,尤其191.96.249.53 。。。。。 (855,182.92.148.207) (3100,121.8.136.75) (3889,61.135.169.81) (53513,191.96.249.53) 3)再搞一个iptables限制,搞定。spark做这种统计分析还是非常简单的,就是一行代码搞定分析。 root@iZ28bhfjhgkZ:/var/log# iptables -L Chain INPUT (policy ACCEPT) target prot opt source destination Chain FORWARD (policy ACCEPT) target prot opt source destination Chain OUTPUT (policy ACCEPT) target prot opt source destination root@iZ28bhfjhgkZ:/var/log# iptables -A INPUT -s 191.96.249.53 -j DROP root@iZ28bhfjhgkZ:/var/log# iptables -L Chain INPUT (policy ACCEPT) target prot opt source destination DROP all -- DEDICATED.SERVER anywhere Chain FORWARD (policy ACCEPT) target prot opt source destination Chain OUTPUT (policy ACCEPT) target prot opt source destination root@iZ28bhfjhgkZ:/var/log# ]]> 郁闷从昨天开始个人网站不断的发出告警504错误,登录机器看了一下是php-fpm报错,这个错误重启php-fpm后,几个小时就告警,快一年了都没什么问题,奇怪
[28-Sep-2016 11:53:19] NOTICE: ready to handle connections
[28-Sep-2016 11:53:19] NOTICE: systemd monitor interval set to 10000ms
[28-Sep-2016 11:53:26] WARNING: [pool www] server reached pm.max_children setting (5), consider raising it
[28-Sep-2016 13:46:35] WARNING: [pool www] server reached pm.max_children setting (5), consider raising it
[28-Sep-2016 13:49:32] WARNING: [pool www] server reached pm.max_children setting (5), consider raising it

以为是这个值设置的太小了,所以修改了配置修改大了值
[28-Sep-2016 15:51:43] NOTICE: fpm is running, pid 28179
[28-Sep-2016 15:51:43] NOTICE: ready to handle connections
[28-Sep-2016 15:51:43] NOTICE: systemd monitor interval set to 10000ms
[28-Sep-2016 15:52:12] WARNING: [pool www] seems busy (you may need to increase pm.start_servers, or pm.min/max_spare_servers), spawning 8 children, there are 0 idle, and 7 total children
[28-Sep-2016 16:15:58] WARNING: [pool www] server reached pm.max_children setting (20), consider raising it
[28-Sep-2016 16:52:32] WARNING: [pool www] server reached pm.max_children setting (20), consider raising it
[28-Sep-2016 16:53:05] WARNING: [pool www] server reached pm.max_children setting (20), consider raising it
[28-Sep-2016 16:55:17] WARNING: [pool www] server reached pm.max_children setting (20), consider raising it

结果后来还是一样,几个小时之后再次504告警,再看nginx的日志,发现一些奇怪的ip访问量非常大。。。有怀疑是有恶意ip的访问,看来有必要查查访问日志中的ip访问量
root@iZ28bhfjhgkZ:/var/log/nginx# vim access.log
121.42.53.180 – – [25/Sep/2016:06:26:29 +0800] “POST /wp-cron.php?doing_wp_cron=1474755989.0131719112396240234375 HTTP/1.0” 499 0 “-” “WordPress/4.3.1; http://zhwen.org”
182.92.148.207 – – [25/Sep/2016:06:26:29 +0800] “GET / HTTP/1.1” 200 41253 “-” “Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0)”
203.208.60.226 – – [25/Sep/2016:06:28:55 +0800] “GET /?p=675 HTTP/1.1” 200 8204 “-” “Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)”
203.208.60.226 – – [25/Sep/2016:06:28:57 +0800] “GET /wp-content/themes/sparkling/inc/css/font-awesome.min.css?ver=4.3.1 HTTP/1.1” 200 26711 “http://zhwen.org/?p=675” “Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)”
203.208.60.226 – – [25/Sep/2016:06:28:57 +0800] “GET /wp-content/plugins/wp-pagenavi/pagenavi-css.css?ver=2.70 HTTP/1.1” 200 374 “http://zhwen.org/?p=675” “Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)”
203.208.60.226 – – [25/Sep/2016:06:28:58 +0800] “GET /wp-content/plugins/yet-another-related-posts-plugin/style/widget.css?ver=4.3.1 HTTP/1.1” 200 771 “http://zhwen.org/?p=675” “Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)”
121.43.107.174 – – [25/Sep/2016:06:29:18 +0800] “GET / HTTP/1.1” 200 41253 “-” “Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0)”
115.28.189.208 – – [25/Sep/2016:06:29:33 +0800] “GET / HTTP/1.1” 200 41253 “-” “Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0)”
42.156.139.59 – – [25/Sep/2016:06:30:58 +0800] “GET /?paged=14 HTTP/1.1” 200 11164 “-” “YisouSpider”
182.92.148.207 – – [25/Sep/2016:06:31:29 +0800] “GET / HTTP/1.1” 200 41253 “-” “Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0)”
61.135.169.81 – – [25/Sep/2016:06:34:14 +0800] “GET /?p=articles/cscope-tags HTTP/1.1” 200 10681 “-” “Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12) AppleWebKit/602.1.50 (KHTML, like Gecko)”
61.135.169.81 – – [25/Sep/2016:06:34:14 +0800] “GET /apple-touch-icon-precomposed.png HTTP/1.1” 404 151 “-” “Safari/12602.1.50.0.10 CFNetwork/807.0.4 Darwin/16.0.0 (x86_64)”

所以对访问日志的ip做了一个简单统计:
1)先把ip取出来(为了减少数据量,其实也可以直接压缩后下载到本地),再下载到本地
root@iZ28bhfjhgkZ:/var/log/nginx# cat access.log|awk ‘{print $1}’ > tt

在sparkshell中执行下面的代码:
val line = sc.textFile(“/data1/data/t1″)
line.flatMap(_.split(” “)).map((_,1)).reduceByKey(_+_).map(e => (e._2, e._1)).reduceByKey(_+”,”+_).sortByKey(true,1).saveAsTextFile(“/data1/data/t3”)

2)最后的结果t3的内容如下,发现这几个ip的访问量非常大,尤其191.96.249.53
。。。。。
(855,182.92.148.207)
(3100,121.8.136.75)
(3889,61.135.169.81)
(53513,191.96.249.53)

3)再搞一个iptables限制,搞定。spark做这种统计分析还是非常简单的,就是一行代码搞定分析。
root@iZ28bhfjhgkZ:/var/log# iptables -L
Chain INPUT (policy ACCEPT)
target prot opt source destination

Chain FORWARD (policy ACCEPT)
target prot opt source destination

Chain OUTPUT (policy ACCEPT)
target prot opt source destination
root@iZ28bhfjhgkZ:/var/log# iptables -A INPUT -s 191.96.249.53 -j DROP
root@iZ28bhfjhgkZ:/var/log# iptables -L
Chain INPUT (policy ACCEPT)
target prot opt source destination
DROP all — DEDICATED.SERVER anywhere

Chain FORWARD (policy ACCEPT)
target prot opt source destination

Chain OUTPUT (policy ACCEPT)
target prot opt source destination
root@iZ28bhfjhgkZ:/var/log#

]]>
http://zhwen.org/?feed=rss2&p=964 0
Spark分区器HashPartitioner详解和扩展 http://zhwen.org/?p=951 http://zhwen.org/?p=951#comments Mon, 30 Nov 2015 08:33:44 +0000 http://zhwen.org/?p=951 1 HashPartitioner分区 HashPartitioner分区的原理很简单,对于给定的key,计算其hashCode,并除于分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是这个key所属的分区ID。实现如下: class HashPartitioner(partitions: Int) extends Partitioner { require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") def numPartitions: Int = partitions // 分片数初始化 def getPartition(key: Any): Int = key match { case null => 0 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) //对key的hashCode进行按照numPartitions取模,这里返回的是一个正整数。 } override def equals(other: Any): Boolean = other match { case h: HashPartitioner => h.numPartitions == numPartitions case _ => false } override def hashCode: Int = numPartitions } 下面是string的hashCode的实现,这里可以看出这hash其实就是很简单的一个字符串按int累加。最后返回的也是一个整型值。 public int hashCode() { int h = hash; if (h == 0 && value.length > 0) { char val[] = value; for (int i = 0; i < value.length; i++) { h = 31 * h + val[i]; } hash = h; } return h; } 从这里看出partitioner的实现是非常简单的,但是实际工作中这个partitioner规则有可能要我们按照自己的数据规则重新定义,就需要扩展partitioner了。下面是我们扩展cityhash算法的partitioner。

2 自定义Partitioner扩展

import org.apache.spark.Partitioner; public class CityHashPartitioner extends Partitioner { static { System.loadLibrary("cityhash"); //加载cityhash的so文件 } private int numParts; public CityHashPartitioner(int numParts) { this.numParts = numParts; } public int getPartition(Object key) { return (int) cityhashJNI.CityHash64IdataMod(key.toString(), key.toString().length(), numParts); //调用cityhash的取模函数,进行取模计算。 } public int numPartitions() { return numParts; } } 还是比较简单,使用时直接按照下面的方式使用即可。 CityHashPartitioner partitioner = new CityHashPartitioner(partnum); world_rdd.repartitionAndSortWithinPartitions(partitioner) .saveAsHadoopFile(world_out_path, String.class, String[].class, TextFileOutFormat.class); TextFileOutFormat又是一个自定义的文件输出类。 ]]>
在Spark中,存在两类分区函数:HashPartitioner和RangePartitioner,它们都是继承自Partitioner,主要提供了每个RDD有几个分区(numPartitions)以及对于给定的值返回一个分区ID(0~numPartitions-1),也就是决定这个值是属于那个分区的。

1 HashPartitioner分区

HashPartitioner分区的原理很简单,对于给定的key,计算其hashCode,并除于分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是这个key所属的分区ID。实现如下:

class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

def numPartitions: Int = partitions // 分片数初始化

def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
//对key的hashCode进行按照numPartitions取模,这里返回的是一个正整数。
}

override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
下面是string的hashCode的实现,这里可以看出这hash其实就是很简单的一个字符串按int累加。最后返回的也是一个整型值。

public int hashCode() {
int h = hash;
if (h == 0 && value.length > 0) {
char val[] = value;
for (int i = 0; i < value.length; i++) {
h = 31 * h + val[i];
}
hash = h;
}
return h;
}

从这里看出partitioner的实现是非常简单的,但是实际工作中这个partitioner规则有可能要我们按照自己的数据规则重新定义,就需要扩展partitioner了。下面是我们扩展cityhash算法的partitioner。

2 自定义Partitioner扩展


import org.apache.spark.Partitioner;

public class CityHashPartitioner extends Partitioner {
static {
System.loadLibrary(“cityhash”); //加载cityhash的so文件
}
private int numParts;

public CityHashPartitioner(int numParts) {
this.numParts = numParts;
}

public int getPartition(Object key) {
return (int) cityhashJNI.CityHash64IdataMod(key.toString(), key.toString().length(), numParts);
//调用cityhash的取模函数,进行取模计算。
}

public int numPartitions() {
return numParts;
}
}
还是比较简单,使用时直接按照下面的方式使用即可。

CityHashPartitioner partitioner = new CityHashPartitioner(partnum);

world_rdd.repartitionAndSortWithinPartitions(partitioner)

.saveAsHadoopFile(world_out_path, String.class,

String[].class, TextFileOutFormat.class);

TextFileOutFormat又是一个自定义的文件输出类。

]]>
http://zhwen.org/?feed=rss2&p=951 0
分布式系统的Raft算法(转) http://zhwen.org/?p=928 http://zhwen.org/?p=928#comments Wed, 14 Oct 2015 07:58:03 +0000 http://zhwen.org/?p=928
  • Leader: 处理所有客户端交互,日志复制等,一般一次只有一个Leader.
  • Follower: 类似选民,完全被动
  • Candidate候选人: 类似Proposer律师,可以被选为一个新的领导人。
  • Raft阶段分为两个,首先是选举过程,然后在选举出来的领导人带领进行正常操作,比如日志复制等。下面用图示展示这个过程: 1. 任何一个服务器都可以成为一个候选者Candidate,它向其他服务器Follower发出要求选举自己的请求: raft1 2. 其他服务器同意了,发出OK。 raft2 注意如果在这个过程中,有一个Follower当机,没有收到请求选举的要求,因此候选者可以自己选自己,只要达到N/2 + 1 的大多数票,候选人还是可以成为Leader的。 3. 这样这个候选者就成为了Leader领导人,它可以向选民也就是Follower们发出指令,比如进行日志复制。 raft3 4. 以后通过心跳进行日志复制的通知 raft4 5. 如果一旦这个Leader当机崩溃了,那么Follower中有一个成为候选者,发出邀票选举。 raft5 6. Follower同意后,其成为Leader,继续承担日志复制等指导工作: raft6   值得注意的是,整个选举过程是有一个时间限制的,如下图: raft7 Splite Vote是因为如果同时有两个候选人向大家邀票,这时通过类似加时赛来解决,两个候选者在一段timeout比如300ms互相不服气的等待以后,因为双方得到的票数是一样的,一半对一半,那么在300ms以后,再由这两个候选者发出邀票,这时同时的概率大大降低,那么首先发出邀票的的候选者得到了大多数同意,成为领导者Leader,而另外一个候选者后来发出邀票时,那些Follower选民已经投票给第一个候选者,不能再投票给它,它就成为落选者了,最后这个落选者也成为普通Follower一员了。  

    日志复制

    下面以日志复制为例子说明Raft算法,假设Leader领导人已经选出,这时客户端发出增加一个日志的要求,比如日志是"sally": raft8 2. Leader要求Followe遵从他的指令,都将这个新的日志内容追加到他们各自日志中: raft9 3.大多数follower服务器将日志写入磁盘文件后,确认追加成功,发出Commited Ok: raft10 4. 在下一个心跳heartbeat中,Leader会通知所有Follwer更新commited 项目。 对于每个新的日志记录,重复上述过程。 如果在这一过程中,发生了网络分区或者网络通信故障,使得Leader不能访问大多数Follwers了,那么Leader只能正常更新它能访问的那些Follower服务器,而大多数的服务器Follower因为没有了Leader,他们重新选举一个候选者作为Leader,然后这个Leader作为代表于外界打交道,如果外界要求其添加新的日志,这个新的Leader就按上述步骤通知大多数Followers,如果这时网络故障修复了,那么原先的Leader就变成Follower,在失联阶段这个老Leader的任何更新都不能算commit,都回滚,接受新的Leader的新的更新。 总结:目前几乎所有语言都已经有支持Raft算法的库包,具体可参考:raftconsensus.github.io 英文动画演示Raft

    CAP定理

    分布式Paxos算法 ]]>
    过去, Paxos一直是分布式协议的标准,但是Paxos难于理解,更难以实现,Google的分布式锁系统Chubby作为Paxos实现曾经遭遇到很多坑。

    来自Stanford的新的分布式协议研究称为Raft,它是一个为真实世界应用建立的协议,主要注重协议的落地性和可理解性。

    在了解Raft之前,我们先了解Consensus一致性这个概念,它是指多个服务器在状态达成一致,但是在一个分布式系统中,因为各种意外可能,有的服务器可能会崩溃或变得不可靠,它就不能和其他服务器达成一致状态。这样就需要一种Consensus协议,一致性协议是为了确保容错性,也就是即使系统中有一两个服务器当机,也不会影响其处理过程。

    为了以容错方式达成一致,我们不可能要求所有服务器100%都达成一致状态,只要超过半数的大多数服务器达成一致就可以了,假设有N台服务器,N/2 +1 就超过半数,代表大多数了。

    Paxos和Raft都是为了实现Consensus一致性这个目标,这个过程如同选举一样,参选者需要说服大多数选民(服务器)投票给他,一旦选定后就跟随其操作。Paxos和Raft的区别在于选举的具体过程不同。

    在Raft中,任何时候一个服务器可以扮演下面角色之一:

    1. Leader: 处理所有客户端交互,日志复制等,一般一次只有一个Leader.
    2. Follower: 类似选民,完全被动
    3. Candidate候选人: 类似Proposer律师,可以被选为一个新的领导人。

    Raft阶段分为两个,首先是选举过程,然后在选举出来的领导人带领进行正常操作,比如日志复制等。下面用图示展示这个过程:

    1. 任何一个服务器都可以成为一个候选者Candidate,它向其他服务器Follower发出要求选举自己的请求:

    raft1

    2. 其他服务器同意了,发出OK。

    raft2

    注意如果在这个过程中,有一个Follower当机,没有收到请求选举的要求,因此候选者可以自己选自己,只要达到N/2 + 1 的大多数票,候选人还是可以成为Leader的。

    3. 这样这个候选者就成为了Leader领导人,它可以向选民也就是Follower们发出指令,比如进行日志复制。

    raft3

    4. 以后通过心跳进行日志复制的通知

    raft4

    5. 如果一旦这个Leader当机崩溃了,那么Follower中有一个成为候选者,发出邀票选举。

    raft5

    6. Follower同意后,其成为Leader,继续承担日志复制等指导工作:

    raft6

     

    值得注意的是,整个选举过程是有一个时间限制的,如下图:

    raft7

    Splite Vote是因为如果同时有两个候选人向大家邀票,这时通过类似加时赛来解决,两个候选者在一段timeout比如300ms互相不服气的等待以后,因为双方得到的票数是一样的,一半对一半,那么在300ms以后,再由这两个候选者发出邀票,这时同时的概率大大降低,那么首先发出邀票的的候选者得到了大多数同意,成为领导者Leader,而另外一个候选者后来发出邀票时,那些Follower选民已经投票给第一个候选者,不能再投票给它,它就成为落选者了,最后这个落选者也成为普通Follower一员了。

     

    日志复制

    下面以日志复制为例子说明Raft算法,假设Leader领导人已经选出,这时客户端发出增加一个日志的要求,比如日志是”sally”:

    raft8

    2. Leader要求Followe遵从他的指令,都将这个新的日志内容追加到他们各自日志中:

    raft9

    3.大多数follower服务器将日志写入磁盘文件后,确认追加成功,发出Commited Ok:

    raft10

    4. 在下一个心跳heartbeat中,Leader会通知所有Follwer更新commited 项目。

    对于每个新的日志记录,重复上述过程。

    如果在这一过程中,发生了网络分区或者网络通信故障,使得Leader不能访问大多数Follwers了,那么Leader只能正常更新它能访问的那些Follower服务器,而大多数的服务器Follower因为没有了Leader,他们重新选举一个候选者作为Leader,然后这个Leader作为代表于外界打交道,如果外界要求其添加新的日志,这个新的Leader就按上述步骤通知大多数Followers,如果这时网络故障修复了,那么原先的Leader就变成Follower,在失联阶段这个老Leader的任何更新都不能算commit,都回滚,接受新的Leader的新的更新。

    总结:目前几乎所有语言都已经有支持Raft算法的库包,具体可参考:raftconsensus.github.io

    英文动画演示Raft

    CAP定理

    分布式Paxos算法

    ]]>
    http://zhwen.org/?feed=rss2&p=928 0
    【转载】从Hadoop到Spark的架构实践 http://zhwen.org/?p=918 http://zhwen.org/?p=918#comments Tue, 29 Sep 2015 09:53:35 +0000 http://zhwen.org/?p=918 550947f7c186e 整个系统构建基于Hadoop 2.0(Cloudera CDH4.3),采用了最原始的大数据计算架构。通过日志汇集程序,将不同业务平台的日志汇集到数据中心,并通过ETL将数据进行格式化处理,储存到HDFS。其中,排名和推荐算法的实现都采用了MapReduce,系统中只存在离线批量计算,并通过基于Azkaban的调度系统进行离线任务的调度。 第一个版本的数据中心架构基本上是以满足“最基本的数据利用”这一目的进行设计的。然而,随着对数据价值探索得逐渐加深,越来越多的实时分析需求被提出。与此同时,更多的机器学习算法也亟需添加,以便支持不同的数据挖掘需求。对于实时数据分析,显然不能通过“对每个分析需求单独开发MapReduce任务”来完成,因此引入Hive 是一个简单而直接的选择。鉴于传统的MapReduce模型并不能很好地支持迭代计算,我们需要一个更好的并行计算框架来支持机器学习算法。而这些正是我们一直在密切关注的Spark所擅长的领域——凭借其对迭代计算的友好支持,Spark理所当然地成为了不二之选。2013年9月底,随着Spark 0.8.0发布,我们决定对最初的架构进行演进,引入Hive作为即时查询的基础,同时引入Spark计算框架来支持机器学习类型的计算,并且验证Spark这个新的计算框架是否能够全面替代传统的以MapReduce为基础的计算框架。图2为整个系统的架构演变。 5509482ee11af 在这个架构中,我们将Spark 0.8.1部署在YARN上,通过分Queue,来隔离基于Spark的机器学习任务,计算排名的日常MapReduce任务和基于Hive的即时分析任务。 想要引入Spark,第一步需要做的就是要取得支持我们Hadoop环境的Spark包。我们的Hadoop环境是Cloudera发布的CDH 4.3,默认的Spark发布包并不包含支持CDH 4.3的版本,因此只能自己编译。Spark官方文档推荐用Maven进行编译,可是编译却不如想象中顺利。各种包依赖由于众所周知的原因,不能顺利地从某些依赖中心库下载。于是我们采取了最简单直接的绕开办法,利用AWS云主机进行编译。需要注意的是,编译前一定要遵循文档的建议,设置: export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" 否则,编译过程中就会遇到内存溢出的问题。针对CDH 4.3,mvn build的参数为: mvn -Pyarn-alpha -Phive -Dhadoop.version=2.0.0-cdh4.3.0 -DskipTests clean package 在编译成功所需要的Spark包后,部署和在Hadoop环境中运行Spark则是非常简单的事情。将编译好的Spark目录打包压缩后,在可以运行Hadoop Client的机器上解压缩,就可以运行Spark了。想要验证Spark是否能够正常在目标Hadoop环境上运行,可以参照Spark的官方文档,运行example中的SparkPi来验证:
    ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
        --master yarn-cluster \
        --num-executors 3 \
        --driver-memory 4g \
        --executor-memory 2g \
        --executor-cores 1 \
        --queue sparkqueue \
        lib/spark-examples*.jar \
        10
    完成Spark部署之后,剩下的就是开发基于Spark的程序了。虽然Spark支持Java、Python,但最合适开发Spark程序的语言还是Scala。经过一段时间的摸索实践,我们掌握了Scala语言的函数式编程语言特点后,终于体会了利用Scala开发Spark应用的巨大好处。同样的功能,用MapReduce几百行才能实现的计算,在Spark中,Scala通过短短的数十行代码就能完成。而在运行时,同样的计算功能,Spark上执行则比MapReduce有数十倍的提高。对于需要迭代的机器学习算法来讲,Spark的RDD模型相比MapReduce的优势则更是明显,更何况还有基本的MLlib的支持。经过几个月的实践,数据挖掘相关工作被完全迁移到Spark,并且在Spark上实现了适合我们数据集的更高效的LR等等算法。 全面拥抱Spark 进入2014年,公司的业务有了长足的发展,对比数据中心平台建立时,每日处理的数据量亦翻了几番。每日的排名计算所花的时间越来越长,而基于Hive的即时计算只能支持日尺度的计算,如果到周这个尺度,计算所花的时间已经很难忍受,到月这个尺度则基本上没办法完成计算。基于在Spark上的认知和积累,是时候将整个数据中心迁移到Spark上了。 2014年4月,Spark Summit China在北京举行。抱着学习的目的,我们技术团队也参加了在中国举行的这一次Spark盛会。通过这次盛会,我们了解到国内的很多同行已经开始采用Spark来建造自己的大数据平台,而Spark也变成了在ASF中最为活跃的项目之一。另外,越来越多的大数据相关的产品也逐渐在和Spark相融合或者在向Spark迁移。Spark无疑将会变为一个相比Hadoop MapReduce更好的生态系统。通过这次大会,我们更加坚定了全面拥抱Spark的决心。 基于YARN和Spark,我们开始重新架构数据中心依赖的大数据平台。整个新的数据平台应该能够承载: 1. 准实时的数据汇集和ETL; 2. 支持流式的数据加工; 3. 更高效的离线计算能力; 4. 高速的多维分析能力; 5. 更高效的即时分析能力; 6. 高效的机器学习能力; 7. 统一的数据访问接口; 8. 统一的数据视图; 9. 灵活的任务调度. 整个新的架构充分地利用YARN和Spark,并且融合公司的一些技术积累,架构如图3所示。 在新的架构中,引入了Kafka作为日志汇集的通道。几个业务系统收集的移动设备侧的日志,实时地写入到Kafka 中,从而方便后续的数据消费。 利用Spark Streaming,可以方便地对Kafka中的数据进行消费处理。在整个架构中,Spark Streaming主要完成了以下工作。 1. 原始日志的保存。将Kafka中的原始日志以JSON格式无损的保存在HDFS中。 2. 数据清洗和转换,清洗和标准化之后,转变为Parquet格式,存储在HDFS中,方便后续的各种数据计算任务。 3. 定义好的流式计算任务,比如基于频次规则的标签加工等等,计算结果直接存储在MongoDB中。 55094997160e7 排名计算任务则在Spark上做了重新实现,借力Spark带来的性能提高,以及Parquet列式存储带来的高效数据访问。同样的计算任务,在数据量提高到原来3倍的情况下,时间开销只有原来的1/6。 同时,在利用Spark和Parquet列式存储带来的性能提升之外,曾经很难满足业务需求的即时多维度数据分析终于成为了可能。曾经利用Hive需要小时级别才能完成日尺度的多维度即时分析,在新架构上,只需要2分钟就能够顺利完成。而周尺度上也不过十分钟就能够算出结果。曾经在Hive上无法完成的月尺度多维度分析计算,则在两个小时内也可以算出结果。另外Spark SQL的逐渐完善也降低了开发的难度。 利用YARN提供的资源管理能力,用于多维度分析,自主研发的Bitmap引擎也被迁移到了YARN上。对于已经确定好的维度,可以预先创建Bitmap索引。而多维度的分析,如果所需要的维度已经预先建立了Bitmap索引,则通过Bitmap引擎由Bitmap计算来实现,从而可以提供实时的多维度的分析能力。 在新的架构中,为了更方便地管理数据,我们引入了基于HCatalog的元数据管理系统,数据的定义、存储、访问都通过元数据管理系统,从而实现了数据的统一视图,方便了数据资产的管理。 YARN只提供了资源的调度能力,在一个大数据平台,分布式的任务调度系统同样不可或缺。在新的架构中,我们自行开发了一个支持DAG的分布式任务调度系统,结合YARN提供的资源调度能力,从而实现定时任务、即时任务以及不同任务构成的pipeline。 基于围绕YARN和Spark的新的架构,一个针对数据业务部门的自服务大数据平台得以实现,数据业务部门可以方便地利用这个平台对进行多维度的分析、数据的抽取,以及进行自定义的标签加工。自服务系统提高了数据利用的能力,同时也大大提高了数据利用的效率。 使用Spark遇到的一些坑 任何新技术的引入都会历经陌生到熟悉,从最初新技术带来的惊喜,到后来遇到困难时的一筹莫展和惆怅,再到问题解决后的愉悦,大数据新贵Spark同样不能免俗。下面就列举一些我们遇到的坑。 【坑一:跑很大的数据集的时候,会遇到org.apache.spark.SparkException: Error communicating with MapOutputTracker】 这个错误报得很隐晦,从错误日志看,是Spark集群partition了,但如果观察物理机器的运行情况,会发现磁盘I/O非常高。进一步分析会发现原因是Spark在处理大数据集时的shuffle过程中生成了太多的临时文件,造成了操作系统磁盘I/O负载过大。找到原因后,解决起来就很简单了,设置spark.shuffle.consolidateFiles为true。这个参数在默认的设置中是false的,对于linux的ext4文件系统,建议大家还是默认设置为true吧。Spark官方文档的描述也建议ext4文件系统设置为true来提高性能。 【坑二:运行时报Fetch failure错】 在大数据集上,运行Spark程序,在很多情况下会遇到Fetch failure的错。由于Spark本身设计是容错的,大部分的Fetch failure会经过重试后通过,因此整个Spark任务会正常跑完,不过由于重试的影响,执行时间会显著增长。造成Fetch failure的根本原因则不尽相同。从错误本身看,是由于任务不能从远程的节点读取shuffle的数据,具体原因则需要利用: yarn logs –applicationId appid 查看Spark的运行日志,从而找到造成Fetch failure的根本原因。其中大部分的问题都可以通过合理的参数配置以及对程序进行优化来解决。2014年Spark Summit China上陈超的那个专题,对于如何对Spark性能进行优化,有非常好的建议。 当然,在使用Spark过程中还遇到过其他不同的问题,不过由于Spark本身是开源的,通过源代码的阅读,以及借助开源社区的帮助,大部分问题都可以顺利解决。 下一步的计划 Spark在2014年取得了长足的发展,围绕Spark的大数据生态系统也逐渐的完善。Spark 1.3引入了一个新的DataFrame API,这个新的DataFrame API将会使得Spark对于数据的处理更加友好。同样出自于AMPLab的分布式缓存系统Tachyon因为其与Spark的良好集成也逐渐引起了人们的注意。鉴于在业务场景中,很多基础数据是需要被多个不同的Spark任务重复使用,下一步,我们将会在架构中引入Tachyon来作为缓存层。另外,随着SSD的日益普及,我们后续的计划是在集群中每台机器都引入SSD存储,配置Spark的shuffle的输出到SSD,利用SSD的高速随机读写能力,进一步提高大数据处理效率。 在机器学习方面,H2O机器学习引擎也和Spark有了良好的集成从而产生了Sparkling-water。相信利用Sparking-water,作为一家创业公司,我们也可以利用深度学习的力量来进一步挖掘数据的价值。 结语 2004年,Google的MapReduce论文揭开了大数据处理的时代,Hadoop的MapReduce在过去接近10年的时间成了大数据处理的代名词。而Matei Zaharia 2012年关于RDD的一篇论文“Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing”则揭示了大数据处理技术一个新时代的到来。伴随着新的硬件技术的发展、低延迟大数据处理的广泛需求以及数据挖掘在大数据领域的日益普及,Spark作为一个崭新的大数据生态系统,逐渐取代传统的MapReduce而成为新一代大数据处理技术的热门。我们过去两年从MapReduce到Spark架构的演变过程,也基本上代表了相当一部分大数据领域从业者的技术演进的历程。相信随着Spark生态的日益完善,会有越来越多的企业将自己的数据处理迁移到Spark上来。而伴随着越来越多的大数据工程师熟悉和了解Spark,国内的Spark社区也会越来越活跃,Spark作为一个开源的平台,相信也会有越来越多的华人变成Spark相关项目的Contributor,Spark也会变得越来越成熟和强大。 作者简介:阎志涛,TalkingData研发副总裁,领导研发了公司的数据管理平台(DMP)、数据观象台等产品,并且负责公司大数据计算平台的研发。
    ]]>
    当下,Spark已经在国内得到了广泛的认可和支持:2014年,Spark Summit China在北京召开,场面火爆;同年,Spark Meetup在北京、上海、深圳和杭州四个城市举办,其中仅北京就成功举办了5次,内容更涵盖Spark Core、Spark Streaming、Spark MLlib、Spark SQL等众多领域。而作为较早关注和引入Spark的移动互联网大数据综合服务公司,TalkingData也积极地参与到国内Spark社区的各种活动,并多次在Meetup中分享公司的Spark使用经验。本文则主要介绍TalkingData在大数据平台建设过程中,逐渐引入Spark,并且以Hadoop YARN和Spark为基础来构建移动大数据平台的过程。

    初识Spark

    作为一家在移动互联网大数据领域创业的公司,时刻关注大数据技术领域的发展和进步是公司技术团队必做的功课。而在整理Strata 2013公开的讲义时,一篇主题为《An Introduction on the Berkeley Data Analytics Stack_BDAS_Featuring Spark,Spark Streaming,and Shark》的教程引起了整个技术团队的关注和讨论,其中Spark基于内存的RDD模型、对机器学习算法的支持、整个技术栈中实时处理和离线处理的统一模型以及Shark都让人眼前一亮。同时期我们关注的还有Impala,但对比Spark,Impala可以理解为对Hive的升级,而Spark则尝试围绕RDD建立一个用于大数据处理的生态系统。对于一家数据量高速增长,业务又是以大数据处理为核心并且在不断变化的创业公司而言,后者无疑更值得进一步关注和研究。

    Spark初探

    2013年中期,随着业务高速发展,越来越多的移动设备侧数据被各个不同的业务平台收集。那么这些数据除了提供不同业务所需要的业务指标,是否还蕴藏着更多的价值?为了更好地挖掘数据潜在价值,我们决定建造自己的数据中心,将各业务平台的数据汇集到一起,对覆盖设备的相关数据进行加工、分析和挖掘,从而探索数据的价值。初期数据中心主要功能设置如下所示:

    1. 跨市场聚合的安卓应用排名;

    2. 基于用户兴趣的应用推荐。

    基于当时的技术掌握程度和功能需求,数据中心所采用的技术架构如图1。
    550947f7c186e

    整个系统构建基于Hadoop 2.0(Cloudera CDH4.3),采用了最原始的大数据计算架构。通过日志汇集程序,将不同业务平台的日志汇集到数据中心,并通过ETL将数据进行格式化处理,储存到HDFS。其中,排名和推荐算法的实现都采用了MapReduce,系统中只存在离线批量计算,并通过基于Azkaban的调度系统进行离线任务的调度。

    第一个版本的数据中心架构基本上是以满足“最基本的数据利用”这一目的进行设计的。然而,随着对数据价值探索得逐渐加深,越来越多的实时分析需求被提出。与此同时,更多的机器学习算法也亟需添加,以便支持不同的数据挖掘需求。对于实时数据分析,显然不能通过“对每个分析需求单独开发MapReduce任务”来完成,因此引入Hive 是一个简单而直接的选择。鉴于传统的MapReduce模型并不能很好地支持迭代计算,我们需要一个更好的并行计算框架来支持机器学习算法。而这些正是我们一直在密切关注的Spark所擅长的领域——凭借其对迭代计算的友好支持,Spark理所当然地成为了不二之选。2013年9月底,随着Spark 0.8.0发布,我们决定对最初的架构进行演进,引入Hive作为即时查询的基础,同时引入Spark计算框架来支持机器学习类型的计算,并且验证Spark这个新的计算框架是否能够全面替代传统的以MapReduce为基础的计算框架。图2为整个系统的架构演变。

    5509482ee11af

    在这个架构中,我们将Spark 0.8.1部署在YARN上,通过分Queue,来隔离基于Spark的机器学习任务,计算排名的日常MapReduce任务和基于Hive的即时分析任务。

    想要引入Spark,第一步需要做的就是要取得支持我们Hadoop环境的Spark包。我们的Hadoop环境是Cloudera发布的CDH 4.3,默认的Spark发布包并不包含支持CDH 4.3的版本,因此只能自己编译。Spark官方文档推荐用Maven进行编译,可是编译却不如想象中顺利。各种包依赖由于众所周知的原因,不能顺利地从某些依赖中心库下载。于是我们采取了最简单直接的绕开办法,利用AWS云主机进行编译。需要注意的是,编译前一定要遵循文档的建议,设置:

    export MAVEN_OPTS=”-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m”

    否则,编译过程中就会遇到内存溢出的问题。针对CDH 4.3,mvn build的参数为:

    mvn -Pyarn-alpha -Phive -Dhadoop.version=2.0.0-cdh4.3.0 -DskipTests clean package

    在编译成功所需要的Spark包后,部署和在Hadoop环境中运行Spark则是非常简单的事情。将编译好的Spark目录打包压缩后,在可以运行Hadoop Client的机器上解压缩,就可以运行Spark了。想要验证Spark是否能够正常在目标Hadoop环境上运行,可以参照Spark的官方文档,运行example中的SparkPi来验证:

    ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
        --master yarn-cluster \
        --num-executors 3 \
        --driver-memory 4g \
        --executor-memory 2g \
        --executor-cores 1 \
        --queue sparkqueue \
        lib/spark-examples*.jar \
        10

    完成Spark部署之后,剩下的就是开发基于Spark的程序了。虽然Spark支持Java、Python,但最合适开发Spark程序的语言还是Scala。经过一段时间的摸索实践,我们掌握了Scala语言的函数式编程语言特点后,终于体会了利用Scala开发Spark应用的巨大好处。同样的功能,用MapReduce几百行才能实现的计算,在Spark中,Scala通过短短的数十行代码就能完成。而在运行时,同样的计算功能,Spark上执行则比MapReduce有数十倍的提高。对于需要迭代的机器学习算法来讲,Spark的RDD模型相比MapReduce的优势则更是明显,更何况还有基本的MLlib的支持。经过几个月的实践,数据挖掘相关工作被完全迁移到Spark,并且在Spark上实现了适合我们数据集的更高效的LR等等算法。

    全面拥抱Spark

    进入2014年,公司的业务有了长足的发展,对比数据中心平台建立时,每日处理的数据量亦翻了几番。每日的排名计算所花的时间越来越长,而基于Hive的即时计算只能支持日尺度的计算,如果到周这个尺度,计算所花的时间已经很难忍受,到月这个尺度则基本上没办法完成计算。基于在Spark上的认知和积累,是时候将整个数据中心迁移到Spark上了。

    2014年4月,Spark Summit China在北京举行。抱着学习的目的,我们技术团队也参加了在中国举行的这一次Spark盛会。通过这次盛会,我们了解到国内的很多同行已经开始采用Spark来建造自己的大数据平台,而Spark也变成了在ASF中最为活跃的项目之一。另外,越来越多的大数据相关的产品也逐渐在和Spark相融合或者在向Spark迁移。Spark无疑将会变为一个相比Hadoop MapReduce更好的生态系统。通过这次大会,我们更加坚定了全面拥抱Spark的决心。

    基于YARN和Spark,我们开始重新架构数据中心依赖的大数据平台。整个新的数据平台应该能够承载:

    1. 准实时的数据汇集和ETL;

    2. 支持流式的数据加工;

    3. 更高效的离线计算能力;

    4. 高速的多维分析能力;

    5. 更高效的即时分析能力;

    6. 高效的机器学习能力;

    7. 统一的数据访问接口;

    8. 统一的数据视图;

    9. 灵活的任务调度.

    整个新的架构充分地利用YARN和Spark,并且融合公司的一些技术积累,架构如图3所示。

    在新的架构中,引入了Kafka作为日志汇集的通道。几个业务系统收集的移动设备侧的日志,实时地写入到Kafka 中,从而方便后续的数据消费。

    利用Spark Streaming,可以方便地对Kafka中的数据进行消费处理。在整个架构中,Spark Streaming主要完成了以下工作。

    1. 原始日志的保存。将Kafka中的原始日志以JSON格式无损的保存在HDFS中。

    2. 数据清洗和转换,清洗和标准化之后,转变为Parquet格式,存储在HDFS中,方便后续的各种数据计算任务。

    3. 定义好的流式计算任务,比如基于频次规则的标签加工等等,计算结果直接存储在MongoDB中。

    55094997160e7

    排名计算任务则在Spark上做了重新实现,借力Spark带来的性能提高,以及Parquet列式存储带来的高效数据访问。同样的计算任务,在数据量提高到原来3倍的情况下,时间开销只有原来的1/6。

    同时,在利用Spark和Parquet列式存储带来的性能提升之外,曾经很难满足业务需求的即时多维度数据分析终于成为了可能。曾经利用Hive需要小时级别才能完成日尺度的多维度即时分析,在新架构上,只需要2分钟就能够顺利完成。而周尺度上也不过十分钟就能够算出结果。曾经在Hive上无法完成的月尺度多维度分析计算,则在两个小时内也可以算出结果。另外Spark SQL的逐渐完善也降低了开发的难度。

    利用YARN提供的资源管理能力,用于多维度分析,自主研发的Bitmap引擎也被迁移到了YARN上。对于已经确定好的维度,可以预先创建Bitmap索引。而多维度的分析,如果所需要的维度已经预先建立了Bitmap索引,则通过Bitmap引擎由Bitmap计算来实现,从而可以提供实时的多维度的分析能力。

    在新的架构中,为了更方便地管理数据,我们引入了基于HCatalog的元数据管理系统,数据的定义、存储、访问都通过元数据管理系统,从而实现了数据的统一视图,方便了数据资产的管理。

    YARN只提供了资源的调度能力,在一个大数据平台,分布式的任务调度系统同样不可或缺。在新的架构中,我们自行开发了一个支持DAG的分布式任务调度系统,结合YARN提供的资源调度能力,从而实现定时任务、即时任务以及不同任务构成的pipeline。

    基于围绕YARN和Spark的新的架构,一个针对数据业务部门的自服务大数据平台得以实现,数据业务部门可以方便地利用这个平台对进行多维度的分析、数据的抽取,以及进行自定义的标签加工。自服务系统提高了数据利用的能力,同时也大大提高了数据利用的效率。

    使用Spark遇到的一些坑

    任何新技术的引入都会历经陌生到熟悉,从最初新技术带来的惊喜,到后来遇到困难时的一筹莫展和惆怅,再到问题解决后的愉悦,大数据新贵Spark同样不能免俗。下面就列举一些我们遇到的坑。

    【坑一:跑很大的数据集的时候,会遇到org.apache.spark.SparkException: Error communicating with MapOutputTracker】

    这个错误报得很隐晦,从错误日志看,是Spark集群partition了,但如果观察物理机器的运行情况,会发现磁盘I/O非常高。进一步分析会发现原因是Spark在处理大数据集时的shuffle过程中生成了太多的临时文件,造成了操作系统磁盘I/O负载过大。找到原因后,解决起来就很简单了,设置spark.shuffle.consolidateFiles为true。这个参数在默认的设置中是false的,对于linux的ext4文件系统,建议大家还是默认设置为true吧。Spark官方文档的描述也建议ext4文件系统设置为true来提高性能。

    【坑二:运行时报Fetch failure错】

    在大数据集上,运行Spark程序,在很多情况下会遇到Fetch failure的错。由于Spark本身设计是容错的,大部分的Fetch failure会经过重试后通过,因此整个Spark任务会正常跑完,不过由于重试的影响,执行时间会显著增长。造成Fetch failure的根本原因则不尽相同。从错误本身看,是由于任务不能从远程的节点读取shuffle的数据,具体原因则需要利用:

    yarn logs –applicationId appid

    查看Spark的运行日志,从而找到造成Fetch failure的根本原因。其中大部分的问题都可以通过合理的参数配置以及对程序进行优化来解决。2014年Spark Summit China上陈超的那个专题,对于如何对Spark性能进行优化,有非常好的建议。

    当然,在使用Spark过程中还遇到过其他不同的问题,不过由于Spark本身是开源的,通过源代码的阅读,以及借助开源社区的帮助,大部分问题都可以顺利解决。

    下一步的计划

    Spark在2014年取得了长足的发展,围绕Spark的大数据生态系统也逐渐的完善。Spark 1.3引入了一个新的DataFrame API,这个新的DataFrame API将会使得Spark对于数据的处理更加友好。同样出自于AMPLab的分布式缓存系统Tachyon因为其与Spark的良好集成也逐渐引起了人们的注意。鉴于在业务场景中,很多基础数据是需要被多个不同的Spark任务重复使用,下一步,我们将会在架构中引入Tachyon来作为缓存层。另外,随着SSD的日益普及,我们后续的计划是在集群中每台机器都引入SSD存储,配置Spark的shuffle的输出到SSD,利用SSD的高速随机读写能力,进一步提高大数据处理效率。

    在机器学习方面,H2O机器学习引擎也和Spark有了良好的集成从而产生了Sparkling-water。相信利用Sparking-water,作为一家创业公司,我们也可以利用深度学习的力量来进一步挖掘数据的价值。

    结语

    2004年,Google的MapReduce论文揭开了大数据处理的时代,Hadoop的MapReduce在过去接近10年的时间成了大数据处理的代名词。而Matei Zaharia 2012年关于RDD的一篇论文“Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing”则揭示了大数据处理技术一个新时代的到来。伴随着新的硬件技术的发展、低延迟大数据处理的广泛需求以及数据挖掘在大数据领域的日益普及,Spark作为一个崭新的大数据生态系统,逐渐取代传统的MapReduce而成为新一代大数据处理技术的热门。我们过去两年从MapReduce到Spark架构的演变过程,也基本上代表了相当一部分大数据领域从业者的技术演进的历程。相信随着Spark生态的日益完善,会有越来越多的企业将自己的数据处理迁移到Spark上来。而伴随着越来越多的大数据工程师熟悉和了解Spark,国内的Spark社区也会越来越活跃,Spark作为一个开源的平台,相信也会有越来越多的华人变成Spark相关项目的Contributor,Spark也会变得越来越成熟和强大。

    作者简介:阎志涛,TalkingData研发副总裁,领导研发了公司的数据管理平台(DMP)、数据观象台等产品,并且负责公司大数据计算平台的研发。

    ]]>
    http://zhwen.org/?feed=rss2&p=918 0
    ubuntu中编译安装protobuf记录 http://zhwen.org/?p=909 http://zhwen.org/?p=909#comments Fri, 11 Sep 2015 08:07:49 +0000 http://zhwen.org/?p=909 1.下载protobuf 下载地址:https://github.com/google/protobuf/releases 2.编译protobuf 解压下载的tar.gz包,cd到protobuf的目录下,执行以下指令: ./configure make make check make install 3.检查安装是否成功 protoc --version 如果成功,则会输出版本号信息,例如:libprotoc 2.6.1 如果有问题,则会输出错误内容。 4.错误及解决方法 protoc: error while loading shared libraries: libprotoc.so.9: cannot open shared object file: No such file or directory 错误原因: protobuf的默认安装路径是/usr/local/lib,而/usr/local/lib 不在Ubuntu体系默认的 LD_LIBRARY_PATH 里,所以就找不到该lib 解决方法: 1. 创建文件 /etc/ld.so.conf.d/libprotobuf.conf 包含内容 /usr/local/lib 2. 输入命令 ldconfig 再运行protoc --version 就可以正常看到版本号了
    ]]>
    1.下载protobuf
    下载地址:https://github.com/google/protobuf/releases

    2.编译protobuf
    解压下载的tar.gz包,cd到protobuf的目录下,执行以下指令:
    ./configure
    make
    make check
    make install

    3.检查安装是否成功
    protoc –version
    如果成功,则会输出版本号信息,例如:libprotoc 2.6.1
    如果有问题,则会输出错误内容。

    4.错误及解决方法
    protoc: error while loading shared libraries: libprotoc.so.9: cannot open shared object file: No such file or directory
    错误原因:
    protobuf的默认安装路径是/usr/local/lib,而/usr/local/lib 不在Ubuntu体系默认的 LD_LIBRARY_PATH 里,所以就找不到该lib
    解决方法:
    1. 创建文件 /etc/ld.so.conf.d/libprotobuf.conf 包含内容
    /usr/local/lib

    2. 输入命令
    ldconfig

    再运行protoc –version 就可以正常看到版本号了

    ]]>
    http://zhwen.org/?feed=rss2&p=909 0
    Java通过swig调用C++接口 http://zhwen.org/?p=907 http://zhwen.org/?p=907#comments Thu, 10 Sep 2015 11:09:54 +0000 http://zhwen.org/?p=907 No related posts. ]]> 记录一下过程
    c++代码: swigshape.h
    #ifndef TEST_CODE_SWIG_TEST_SHAPE_H
    #define TEST_CODE_SWIG_TEST_SHAPE_H
    #pragma once

    class Shape {
    public:
    Shape() {
    nshapes++;
    }
    virtual ~Shape() {
    nshapes–;
    };
    double x, y;
    void move(double dx, double dy);
    virtual double area() = 0;
    virtual double perimeter() = 0;
    int nshapes;
    };

    class Circle : public Shape {
    private:
    double radius;
    public:
    Circle(double r) : radius(r) { };
    ~Circle() {};
    virtual double area();
    virtual double perimeter();
    };

    class Square : public Shape {
    private:
    double width;
    public:
    Square(double w) : width(w) { };
    ~Square() {};
    virtual double area();
    virtual double perimeter();
    };

    #endif // TEST_CODE_SWIG_TEST_SHAPE_H

    swig代码:swigshape.i
    /* File : swigshape.i */
    %module swigshape

    %{
    #include “swigshape.h”
    %}

    /* Let’s just grab the original header file here */
    %include “swigshape.h”

    java测试代码: helloswig.java
    public class helloswig {

    static {
    System.out.println(System.getProperty(“java.library.path”));
    System.loadLibrary(“shape”);
    }

    public static void main(String argv[]) {
    //
    System.out.println( “Creating some objects:” );
    Circle c = new Circle(10);
    System.out.println( ” Created circle ” + c );
    Square s = new Square(10);
    System.out.println( ” Created square ” + s );

    c.delete();
    s.delete();

    System.out.println( “Goodbye” );
    }
    }

    scons文件:SConstruct
    VariantDir(“.obj/”, “./”, duplicate=0);
    env = Environment() # Initialize the environment
    env.Append(CCFLAGS = [‘-g’,’-O3′])

    env.Append(CPPDEFINES={‘RELEASE_BUILD’ : ‘1’})
    env.Append(LIBPATH = [‘/usr/local/lib/’])
    env.Append(CPPDEFINES=[‘BIG_ENDIAN’])
    env.Append(CPPPATH = [‘/usr/local/include/’, ‘/usr/lib/jvm/java-8-openjdk-amd64/include/’, ‘/usr/lib/jvm/java-8-openjdk-amd64/include/linux/’])

    env.SharedLibrary(
    target = ‘shape’,
    source = [“.obj/swigshape.cpp”, “.obj/swigshape_wrap.cxx”]
    )

    swigshape_wrap.cxx 由swig编译生成

    编译运行:
    elight@helight-xu:/data/helight_project/xlight/test_code/swig_test$ swig -c++ -java swigshape.i

    helight@helight-xu:/data/helight_project/xlight/test_code/swig_test$ ls
    Circle.java helloswig.java SConstruct Shape.java Square.java swigshape.cpp swigshape.h swigshape.i swigshape.java swigshapeJNI.java swigshape_wrap.cxx

    编译生成静态库:
    helight@helight-xu:/data/helight_project/xlight/test_code/swig_test$ scons
    scons: Reading SConscript files …
    scons: done reading SConscript files.
    scons: Building targets …
    scons: building associated VariantDir targets: .obj
    g++ -o libshape.so -shared .obj/swigshape.os .obj/swigshape_wrap.os -L/usr/local/lib
    scons: done building targets.
    helight@helight-xu:/data/helight_project/xlight/test_code/swig_test$ ls
    Circle.java helloswig.java libshape.so SConstruct Shape.java Square.java swigshape.cpp swigshape.h swigshape.i swigshape.java swigshapeJNI.java swigshape_wrap.cxx

    编译java代码:
    helight@helight-xu:/data/helight_project/xlight/test_code/swig_test$ javac *.java
    Picked up JAVA_TOOL_OPTIONS: -javaagent:/usr/share/java/jayatanaag.jar

    运行代码:
    helight@helight-xu:/data/helight_project/xlight/test_code/swig_test$ export LD_LIBRARY_PATH=./
    helight@helight-xu:/data/helight_project/xlight/test_code/swig_test$ java helloswig
    Picked up JAVA_TOOL_OPTIONS: -javaagent:/usr/share/java/jayatanaag.jar
    ./:/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
    Creating some objects:
    Created circle Circle@677327b6
    Created square Square@14ae5a5
    Goodbye
    helight@helight-xu:/data/helight_project/xlight/test_code/swig_test$

    编译问题:
    helight@helight-xu:/data/helight_project/xlight/test_code/swig_test$ java helloswig
    Picked up JAVA_TOOL_OPTIONS: -javaagent:/usr/share/java/jayatanaag.jar
    Exception in thread “main” java.lang.UnsatisfiedLinkError: no shape in java.library.path
    at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1865)
    at java.lang.Runtime.loadLibrary0(Runtime.java:870)
    at java.lang.System.loadLibrary(System.java:1122)
    at helloswig.<clinit>(helloswig.java:4)
    主要是jvm运行时候加载我们指定的shape的动态库的时候找不到路径,用下买的方式增加一个搜索目录.
    export LD_LIBRARY_PATH=./

    helight@helight-xu:/data/helight_project/xlight/test_code/swig_test$ java helloswig
    Picked up JAVA_TOOL_OPTIONS: -javaagent:/usr/share/java/jayatanaag.jar
    ./:/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
    Creating some objects:
    Exception in thread “main” java.lang.UnsatisfiedLinkError: shapeJNI.new_Circle(D)J
    at shapeJNI.new_Circle(Native Method)
    at Circle.<init>(Circle.java:38)
    at helloswig.main(helloswig.java:11)
    主要静态库编译的时候没有swig的文件也编译到里面导致,在scons的配置文件中增加:swigshape_wrap.cxx

    g++ -o .obj/shape_wrap.os -c -g -O3 -fPIC -DRELEASE_BUILD=1 -DBIG_ENDIAN -I/usr/local/include shape_wrap.cxx
    shape_wrap.cxx:159:17: fatal error: jni.h: No such file or directory
    #include <jni.h>

    java的jni头文件引用路径没有设置, 以下方式添加.
    env.Append(CPPPATH = [‘/usr/local/include/’, ‘/usr/lib/jvm/java-8-openjdk-amd64/include/’, ‘/usr/lib/jvm/java-8-openjdk-amd64/include/linux/’])

    ]]>
    http://zhwen.org/?feed=rss2&p=907 0
    spark1.3.1单机安装测试备忘 http://zhwen.org/?p=900 http://zhwen.org/?p=900#comments Tue, 09 Jun 2015 03:31:45 +0000 http://zhwen.org/?p=900 1.下载,安装spark和scala:
    下载1.3.1的hadoop2.6版本. spark-1.3.1-bin-hadoop2.6.tgz
    下载到本地之后直接解压即可:
    helight@helight-xu:/data/spark$ tar zxf spark-1.3.1-bin-hadoop2.6.tgz
    下载scala,2.11.6,也是直接解压即可:
    helight@helight-xu:/data/spark$ tar zxf scala-2.11.6.tgz
    安装spark和scala直接配置环境变量即可,可以直接写到 系统环境变量配置文件/etc/profile
    或者写道用户配置文件中~/.bashrc中
    export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/
    export SCALA_HOME=/data/spark/scala-2.11.6
    export PATH=$PATH:$JAVA_HOME/bin:$SCALA_HOME/bin
    以上就是基本配置.
    2.ssh本地互信登录配
    这里和hadoop中的互信配置一样.
    首先在机器上安装openssh-server和openssh-client.
    helight@helight-xu:~/.ssh$ ssh-keygen
    一直回车即可,不要输入任何东西
    helight@helight-xu:~/.ssh$ ls
    id_rsa id_rsa.pub known_hosts
    helight@helight-xu:~/.ssh$ cat id_rsa.pub >authorized_keys
    helight@helight-xu:~/.ssh$ ll
    total 24
    drwx------ 2 helight helight 4096 6月 8 15:06 ./
    drwxr-xr-x 23 helight helight 4096 6月 9 09:59 ../
    -rw------- 1 helight helight 400 6月 8 15:06 authorized_keys
    -rw------- 1 helight helight 1679 6月 8 15:06 id_rsa
    -rw-r--r-- 1 helight helight 400 6月 8 15:06 id_rsa.pub
    -rw-r--r-- 1 helight helight 444 6月 8 15:21 known_hosts
    authorized_keys文件的权限设置为600,如上,这里或需要重新注销登录一下才可以无密码登录
    helight@helight-xu:~/.ssh$ ssh localhost
    Welcome to Ubuntu 15.04 (GNU/Linux 3.19.0-20-generic x86_64)

    * Documentation: https://help.ubuntu.com/

    Last login: Mon Jun 8 15:20:51 2015 from localhost
    helight@helight-xu:~$
    如上面的登录方式,则表示本机无密码登录ok了.
    3.spark启动配置

    3.1 配置spark-env.sh

    Copy一份文件spark-env.sh.template重命名为spark-env.sh,在文件末尾添加

    export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/
    export SCALA_HOME=/data/spark/scala-2.11.6
    export SPARK_MASTER_IP=helight-xu
    export SPARK_WORKER_CORES=1
    export SPARK_WORKER_INSTANCES=1
    export SPARK_WORKER_MEMORY=512M

    可以看到,JAVA_HOMESCALA_HOME都关联上了。

    赋予spark-env.sh可执行权限

    chmod 777 spark-env.sh

     3.2    配置slaves 

    Copy一份slaves.template文件重命名为slaves,添加机器名(或者ip,不过ip没试过)

    # A Spark Worker will be started on each of the machines listed below.

    # localhost
    helight-xu

    3.3配置spark-defaults.conf

    Copy一份spark-defaults.conf.template重命名为spark-defaults.conf,把相关项打开(最后spark.executor.extraJavaOptions这项我目前还不知道使用,待研究)。

    # Default system properties included when running spark-submit.

    # This is useful for setting default environmental settings.

     

    # Example:

    spark.master                    spark://helight-xu:7077
    spark.executor.memory 512m
    spark.eventLog.enabled true
    spark.eventLog.dir          /data/spark/spark-1.3.1-bin-hadoop2.6/logs/
    spark.serializer                 org.apache.spark.serializer.KryoSerializer
    spark.driver.memory       512m

    3.4       配置log4j.properties

    Copy一份log4j.properties.template文件重命名为log4j.properties即可。内容如下:

    # Set everything to be logged to the console

    log4j.rootCategory=INFO, console

    log4j.appender.console=org.apache.log4j.ConsoleAppender

    log4j.appender.console.target=System.err

    log4j.appender.console.layout=org.apache.log4j.PatternLayout

    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

     

    # Settings to quiet third party logs that are too verbose

    log4j.logger.org.eclipse.jetty=WARN

    log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR

    log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO

    log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

    3.5启动spark

    helight@helight-xu:/data/spark/spark_hadoop$ ./sbin/start-all.sh
    starting org.apache.spark.deploy.master.Master, logging to /data/spark/spark-1.3.1-bin-hadoop2.6/sbin/../logs/spark-helight-org.apache.spark.deploy.master.Master-1-helight-xu.out
    helight-xu: starting org.apache.spark.deploy.worker.Worker, logging to /data/spark/spark-1.3.1-bin-hadoop2.6/sbin/../logs/spark-helight-org.apache.spark.deploy.worker.Worker-1-helight-xu.out
    helight@helight-xu:/data/spark/spark_hadoop$
    查看启动进程:
    helight@helight-xu:/data/spark/spark_hadoop$ jps
    Picked up JAVA_TOOL_OPTIONS: -javaagent:/usr/share/java/jayatanaag.jar
    2625 Worker
    2758 Jps
    2410 Master
    helight@helight-xu:/data/spark/spark_hadoop$
     
    helight@helight-xu:/data/spark/spark_hadoop/conf$ ps axu|grep spark
    helight 2410 0.7 3.6 4064160 292772 pts/0 Sl 09:44 0:34 /usr/lib/jvm/java-8-openjdk-amd64//bin/java -cp /data/spark/spark-1.3.1-bin-hadoop2.6/sbin/../conf:/data/spark/spark-1.3.1-bin-hadoop2.6/lib/spark-assembly-1.3.1-hadoop2.6.0.jar:/data/spark/spark-1.3.1-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/data/spark/spark-1.3.1-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/data/spark/spark-1.3.1-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip helight-xu --port 7077 --webui-port 8080
    helight 2625 0.7 3.3 4041960 270248 ? Sl 09:44 0:34 /usr/lib/jvm/java-8-openjdk-amd64//bin/java -cp /data/spark/spark-1.3.1-bin-hadoop2.6/sbin/../conf:/data/spark/spark-1.3.1-bin-hadoop2.6/lib/spark-assembly-1.3.1-hadoop2.6.0.jar:/data/spark/spark-1.3.1-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/data/spark/spark-1.3.1-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/data/spark/spark-1.3.1-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://helight-xu:7077 --webui-port 8081
    helight 3849 0.0 0.0 11176 2648 pts/0 S+ 10:57 0:00 grep --color=auto spark
    helight@helight-xu:/data/spark/spark_hadoop/conf$
    spark的web ui界面:
    http://localhost:8080/
     Screenshot from 2015-06-09 10:59:17
    3.6提交任务:
    ./bin/spark-submit --class org.zhwen.test.spark_test.WordCount --master spark://helight-xu:7077 /data/helight/workspace/spark_test/target/idata-task-project-0.0.1-xu-jar-with-dependencies.jar
     Screenshot from 2015-06-09 11:29:44
    ]]>
    1.下载,安装spark和scala:
    下载1.3.1的hadoop2.6版本. spark-1.3.1-bin-hadoop2.6.tgz
    下载到本地之后直接解压即可:
    helight@helight-xu:/data/spark$ tar zxf spark-1.3.1-bin-hadoop2.6.tgz
    下载scala,2.11.6,也是直接解压即可:
    helight@helight-xu:/data/spark$ tar zxf scala-2.11.6.tgz

    安装spark和scala直接配置环境变量即可,可以直接写到 系统环境变量配置文件/etc/profile
    或者写道用户配置文件中~/.bashrc中
    export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/
    export SCALA_HOME=/data/spark/scala-2.11.6
    export PATH=$PATH:$JAVA_HOME/bin:$SCALA_HOME/bin
    以上就是基本配置.
    2.ssh本地互信登录配
    这里和hadoop中的互信配置一样.
    首先在机器上安装openssh-server和openssh-client.
    helight@helight-xu:~/.ssh$ ssh-keygen
    一直回车即可,不要输入任何东西
    helight@helight-xu:~/.ssh$ ls
    id_rsa id_rsa.pub known_hosts
    helight@helight-xu:~/.ssh$ cat id_rsa.pub >authorized_keys
    helight@helight-xu:~/.ssh$ ll
    total 24
    drwx—— 2 helight helight 4096 6月 8 15:06 ./
    drwxr-xr-x 23 helight helight 4096 6月 9 09:59 ../
    -rw——- 1 helight helight 400 6月 8 15:06 authorized_keys
    -rw——- 1 helight helight 1679 6月 8 15:06 id_rsa
    -rw-r–r– 1 helight helight 400 6月 8 15:06 id_rsa.pub
    -rw-r–r– 1 helight helight 444 6月 8 15:21 known_hosts
    authorized_keys文件的权限设置为600,如上,这里或需要重新注销登录一下才可以无密码登录
    helight@helight-xu:~/.ssh$ ssh localhost
    Welcome to Ubuntu 15.04 (GNU/Linux 3.19.0-20-generic x86_64)

    * Documentation: https://help.ubuntu.com/

    Last login: Mon Jun 8 15:20:51 2015 from localhost
    helight@helight-xu:~$
    如上面的登录方式,则表示本机无密码登录ok了.
    3.spark启动配置

    3.1 配置spark-env.sh

    Copy一份文件spark-env.sh.template重命名为spark-env.sh,在文件末尾添加

    export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/
    export SCALA_HOME=/data/spark/scala-2.11.6
    export SPARK_MASTER_IP=helight-xu
    export SPARK_WORKER_CORES=1
    export SPARK_WORKER_INSTANCES=1
    export SPARK_WORKER_MEMORY=512M

    可以看到,JAVA_HOMESCALA_HOME都关联上了。

    赋予spark-env.sh可执行权限

    chmod 777 spark-env.sh

     3.2    配置slaves 

    Copy一份slaves.template文件重命名为slaves,添加机器名(或者ip,不过ip没试过)

    # A Spark Worker will be started on each of the machines listed below.

    # localhost
    helight-xu

    3.3配置spark-defaults.conf

    Copy一份spark-defaults.conf.template重命名为spark-defaults.conf,把相关项打开(最后spark.executor.extraJavaOptions这项我目前还不知道使用,待研究)。

    # Default system properties included when running spark-submit.

    # This is useful for setting default environmental settings.

     

    # Example:

    spark.master                    spark://helight-xu:7077
    spark.executor.memory 512m
    spark.eventLog.enabled true
    spark.eventLog.dir          /data/spark/spark-1.3.1-bin-hadoop2.6/logs/
    spark.serializer                 org.apache.spark.serializer.KryoSerializer
    spark.driver.memory       512m

    3.4       配置log4j.properties

    Copy一份log4j.properties.template文件重命名为log4j.properties即可。内容如下:

    # Set everything to be logged to the console

    log4j.rootCategory=INFO, console

    log4j.appender.console=org.apache.log4j.ConsoleAppender

    log4j.appender.console.target=System.err

    log4j.appender.console.layout=org.apache.log4j.PatternLayout

    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

     

    # Settings to quiet third party logs that are too verbose

    log4j.logger.org.eclipse.jetty=WARN

    log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR

    log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO

    log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

    3.5启动spark

    helight@helight-xu:/data/spark/spark_hadoop$ ./sbin/start-all.sh
    starting org.apache.spark.deploy.master.Master, logging to /data/spark/spark-1.3.1-bin-hadoop2.6/sbin/../logs/spark-helight-org.apache.spark.deploy.master.Master-1-helight-xu.out
    helight-xu: starting org.apache.spark.deploy.worker.Worker, logging to /data/spark/spark-1.3.1-bin-hadoop2.6/sbin/../logs/spark-helight-org.apache.spark.deploy.worker.Worker-1-helight-xu.out
    helight@helight-xu:/data/spark/spark_hadoop$
    查看启动进程:
    helight@helight-xu:/data/spark/spark_hadoop$ jps
    Picked up JAVA_TOOL_OPTIONS: -javaagent:/usr/share/java/jayatanaag.jar
    2625 Worker
    2758 Jps
    2410 Master
    helight@helight-xu:/data/spark/spark_hadoop$

     

    helight@helight-xu:/data/spark/spark_hadoop/conf$ ps axu|grep spark
    helight 2410 0.7 3.6 4064160 292772 pts/0 Sl 09:44 0:34 /usr/lib/jvm/java-8-openjdk-amd64//bin/java -cp /data/spark/spark-1.3.1-bin-hadoop2.6/sbin/../conf:/data/spark/spark-1.3.1-bin-hadoop2.6/lib/spark-assembly-1.3.1-hadoop2.6.0.jar:/data/spark/spark-1.3.1-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/data/spark/spark-1.3.1-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/data/spark/spark-1.3.1-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.master.Master –ip helight-xu –port 7077 –webui-port 8080
    helight 2625 0.7 3.3 4041960 270248 ? Sl 09:44 0:34 /usr/lib/jvm/java-8-openjdk-amd64//bin/java -cp /data/spark/spark-1.3.1-bin-hadoop2.6/sbin/../conf:/data/spark/spark-1.3.1-bin-hadoop2.6/lib/spark-assembly-1.3.1-hadoop2.6.0.jar:/data/spark/spark-1.3.1-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/data/spark/spark-1.3.1-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/data/spark/spark-1.3.1-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://helight-xu:7077 –webui-port 8081
    helight 3849 0.0 0.0 11176 2648 pts/0 S+ 10:57 0:00 grep –color=auto spark
    helight@helight-xu:/data/spark/spark_hadoop/conf$
    spark的web ui界面:
    http://localhost:8080/
     Screenshot from 2015-06-09 10:59:17
    3.6提交任务:
    ./bin/spark-submit –class org.zhwen.test.spark_test.WordCount –master spark://helight-xu:7077 /data/helight/workspace/spark_test/target/idata-task-project-0.0.1-xu-jar-with-dependencies.jar
     Screenshot from 2015-06-09 11:29:44
    ]]>
    http://zhwen.org/?feed=rss2&p=900 0
    thrift maven编译运行 http://zhwen.org/?p=896 http://zhwen.org/?p=896#comments Fri, 22 May 2015 04:46:04 +0000 http://zhwen.org/?p=896 dependencies编译,pom.xml中加入build配置:
       <build >
                   < sourceDirectory> src/main/java </ sourceDirectory>
                   < plugins>
                          <!-- Bind the maven-assembly-plugin to the package phase this will create
                               a jar file without the storm dependencies suitable for deployment to a cluster. -->
                          < plugin>
                                < artifactId> maven-assembly- plugin</ artifactId>
                                < configuration>
                                       < descriptorRefs>
                                              < descriptorRef> jar-with-dependencies </descriptorRef >
                                       </ descriptorRefs>
                                       < archive>
                                              < manifest>
                                                     < mainClass></ mainClass >
                                              </ manifest>
                                       </ archive>
                                </ configuration>
                                < executions>
                                       < execution>
                                              < id> make-assembly </id >
                                              < phase> package</ phase >
                                              < goals>
                                                     < goal> single</ goal >
                                              </ goals>
                                       </ execution>
                                </ executions>
                          </ plugin>
                          < plugin>
                                < groupId> org.apache.maven.plugins </groupId >
                                < artifactId> maven-compiler- plugin</ artifactId>
                                < configuration>
                                       < source> 1.7</ source >
                                       < target> 1.7</ target >
                                </ configuration>
                          </ plugin>
     
                          < plugin>
                                < groupId> org.apache.maven.plugins </groupId >
                                < artifactId> mavensurefire-plugin </artifactId >
                                < configuration>
                                       < skip> true</ skip ><!-- 跳过测试用例 -->
                                </ configuration>
                          </ plugin>
     
                   </ plugins>
            </ build>
     
    thrift依赖:
       <dependencies >
        <dependency >
          <groupId > junit</ groupId>
          <artifactId > junit</ artifactId>
          <version >3.8.1 </ version>
          <scope >test </ scope>
        </dependency >
       
            < dependency>
             < groupId> org.apache.thrift </groupId >
             < artifactId> libthrift </artifactId >
             < version> 0.9.2</ version >
            </ dependency>
            < dependency>
                   < groupId> org.slf4j </groupId >
                   < artifactId> slf4j-log4j12 </artifactId >
                   < version> 1.5.8</ version >
            </ dependency>
      </dependencies >
     
    运行jar包:
    E:\idata_spark_work\test\target>java -cp .\test-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.zhwen.test.HelloServerDemo
    E:\idata_spark_work\test\target>java -cp .\test-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.zhwen.test.HelloClientDemo ]]>
    dependencies编译,pom.xml中加入build配置:
       <build >
                   < sourceDirectory> src/main/java </ sourceDirectory>
                   < plugins>
                          <!– Bind the maven-assembly-plugin to the package phase this will create
                               a jar file without the storm dependencies suitable for deployment to a cluster. –>
                          < plugin>
                                < artifactId> maven-assembly- plugin</ artifactId>
                                < configuration>
                                       < descriptorRefs>
                                              < descriptorRef> jar-with-dependencies </descriptorRef >
                                       </ descriptorRefs>
                                       < archive>
                                              < manifest>
                                                     < mainClass></ mainClass >
                                              </ manifest>
                                       </ archive>
                                </ configuration>
                                < executions>
                                       < execution>
                                              < id> make-assembly </id >
                                              < phase> package</ phase >
                                              < goals>
                                                     < goal> single</ goal >
                                              </ goals>
                                       </ execution>
                                </ executions>
                          </ plugin>
                          < plugin>
                                < groupId> org.apache.maven.plugins </groupId >
                                < artifactId> maven-compiler- plugin</ artifactId>
                                < configuration>
                                       < source> 1.7</ source >
                                       < target> 1.7</ target >
                                </ configuration>
                          </ plugin>
     
                          < plugin>
                                < groupId> org.apache.maven.plugins </groupId >
                                < artifactId> maven– surefireplugin </artifactId >
                                < configuration>
                                       < skip> true</ skip ><!– 跳过测试用例 –>
                                </ configuration>
                          </ plugin>
     
                   </ plugins>
            </ build>
     
    thrift依赖:
       <dependencies >
        <dependency >
          <groupId > junit</ groupId>
          <artifactId > junit</ artifactId>
          <version >3.8.1 </ version>
          <scope >test </ scope>
        </dependency >
       
            < dependency>
             < groupId> org.apache.thrift </groupId >
             < artifactId> libthrift </artifactId >
             < version> 0.9.2</ version >
            </ dependency>
            < dependency>
                   < groupId> org.slf4j </groupId >
                   < artifactId> slf4j-log4j12 </artifactId >
                   < version> 1.5.8</ version >
            </ dependency>
      </dependencies >
     
    运行jar包:
    E:\idata_spark_work\test\target>java -cp .\test-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.zhwen.test.HelloServerDemo

    E:\idata_spark_work\test\target>java -cp .\test-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.zhwen.test.HelloClientDemo

    ]]>
    http://zhwen.org/?feed=rss2&p=896 0