0%

Spark相对于Hadoop来说一个最大的优势就是可以支持迭代运算在内存当中进行,正如Spark官网首页上列出的第一个优势“Speed”,官方是这样描述的:

Speed
Run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk. Apache Spark has an advanced DAG
execution engine that support acyclic data flow and in-memory
computing.

同时官方还给了这样一张图来说明对于迭代式计算的逻辑回归而言,Spark比Hadoop快了可不是一点半点。

Logistic regression in Hadoop and Spark

对于in-memory的Spark来说,了解一下其内存管理还是十分重要的,因为Spark的in-memory的计算特性,其对内存的消耗还是很巨大的,如果对Spark的内存管理不够了解便不能充分利用所有的内存资源,并且很容易导致内存不够用时中间数据被缓存到了磁盘当中影响计算的速度。

Spark内存管理接口

Spark为存储内存和执行内存的管理提供了统一的抽象类——MemoryManager,同一个Executor内的任务都调用这个抽象类的方法来申请或释放内存,几个关键的内存管理接口定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean

def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean

private[memory]
def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long

private[memory]
def releaseExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Unit = synchronized {
memoryMode match {
case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
}
}

private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
}

def releaseStorageMemory(numBytes: Long): Unit = synchronized {
storageMemoryPool.releaseMemory(numBytes)
}

final def releaseAllStorageMemory(): Unit = synchronized {
storageMemoryPool.releaseAllMemory()
}

final def releaseUnrollMemory(numBytes: Long): Unit = synchronized {
releaseStorageMemory(numBytes)
}

Spark的内存系统随着Spark版本的发展具有非常多的变化,1.6.0版本之后新的内存管理模块的实现类为UnifiedMemoryManager,1.6.0版本之前采用的静态管理StaticMemoryManager方式仍被保留,称为Legacy模式。Legacy模式默认是关闭的,需要通过增加一个配置参数spark.memory.useLegacyMode=true来开启。

内存管理

静态内存管理

首先来简略地了解一下Spark 1.6.0版本之前的内存管理模型,对于一个Executor,内存一般由3个部分构成:

  1. Executor Memory,这片内存区域是为了解决shuffle、joins、sorts以及aggregations过程中为了避免繁琐的IO需要的buffer,可以通过参数spark.shuffle.memoryFraction配置,其默认值为0.2。
  2. Storage Memory,这片内存区域是用于RDD的cache、persist以及broadcasts和task results的存储,可以通过参数spark.storage.memoryFraction配置,默认值为0.6.
  3. Other Memory,给系统预留的,因为程序本身运行也是需要内存的,其默认比例为0.2。
    除此之外,为了防止OOM,一般都会有个safetyFraction,这种内存分配机制最大的问题就是其静态性,每个部分都不能超过自己的上限,规定了多少就是多少,这在Storage Memory和Executor Memory当中尤为严重。借用别人的一张图片能够很清楚的说明静态内存的分配方式:
    阅读全文 »

groupByKey

在编写处理大规模数据的Spark代码的时候遇到一个问题,对大规模数据进行groupByKey操作的时候时间非常长,而且很容易出现OOM的情况。这其中的主要原因有几点,一是groupByKey会造成大量的数据shuffle,大量的IO会影响程序的运行时间;二是每一个key下对应的数据非常不均匀,有的key对应的key非常多,在某个executor上的数据可能会超出内存大小,造成OOM的情况。

groupByKey这个API事实上不是一个非常高效的API,会造成大量的数据搬移,效率不高。在我的项目当中,我实际上要做的任务是将我的数据按照染色体序号进行group,将染色体号为1的记录归并到一起进行处理,将染色体序号为2的记录归并到一起处理,等等。由于记录的数量非常庞大,并且不同染色体号的记录数量又和染色体的长度相关,是十分不均匀的,例如1号染色体对应的记录数量就远远大于Y染色体,groupByKey会消耗非常多的时间在数据迁移以及数据的序列化反序列化上,同时,每一个group数据量的不均匀性又会导致某些executor上内存压力过大,出现OOM的情况。

partitionBy

其实对于我的这种情况使用partitionBy会更为适合,可以自己实现一个partitioner,也可以直接使用HashPartitioner,以染色体号作为key进行重新分区,然后再使用mapPartition在每个分区内处理不同染色体,partitionBy的效率会比groupByKey高很多。

repartitionAndSortWithinPartitions

在我的项目当中,对每个染色体号对应的记录进行的数据处理包括对数据的排序操作,在数据量很大的时候,这个排序操作也会很耗费时间,但是在partitionBy对数据进行shuffle的时候,已经对数据进行过遍历了,之后再次排序需要又一次遍历数据,十分浪费时间。于是,我又找到了一个非常适合我的程序的一个新的分区接口repartitionAndSortWithinPartitions,首先看下这个接口的使用方法:

1
2
3
4
5
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]

Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys.

This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

为了使用这个接口,我就必须将我自己定义的数据类型定义一个排序规则,即定义一个Ordering,具体的定义方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
object MySAMRecord {
implicit val samRecordOrdering: Ordering[MySAMRecord] = new Ordering[MySAMRecord] {
override def compare(x: MySAMRecord, y: MySAMRecord): Int = {
if (x.referenceIndex != y.referenceIndex) x.referenceIndex - y.referenceIndex
else {
if (x.startPos != y.startPos) {
x.startPos - y.startPos
} else {
if (new String(x.originalStrByteArr) > new String(y.originalStrByteArr)) 1 else -1
}
}
}
}
}

同时,由于repartitionAndSortWithinPartitions接口的排序是按照key进行的,我就不能使用原有的HashPartitioner进行分区,需要自己定义Partitioner,使得我自己定义的数据类型作为key值时,仍然能够按照染色体序号进行分区,我自己的Partitioner函数定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.apache.spark.Partitioner

class MySAMRecordPartitioner(numParts: Int) extends Partitioner {
override def numPartitions: Int = numParts

override def getPartition(key: Any): Int = {
val record: MySAMRecord = key.asInstanceOf[MySAMRecord]

val code = record.regionId % numPartitions
if (code < 0) {
code + numPartitions
} else {
code
}
}

override def equals(other: Any): Boolean = other match {
case records: MySAMRecordPartitioner =>
records.numPartitions == numPartitions
case _ =>
false
}

override def hashCode(): Int = numPartitions
}

当我的数据输入是RDD[MySAMRecord]时,为了使用repartitionAndSortWithinPartitions,需要将输入转换成键值对形式,RDD[(MySAMRecord, None)],repartition之后,每个partition中的数据就会被自动排序完成,从源码注释当中我们也可以看到,这样子操作是比repartition之后再在每个分区中sorting是要快的,因为这个排序是在shffle的同时进行的,对数据的遍历在shffle的时候只进行了一次,效率当然会高很多。之后再使用mapPartition对每个分区进行操作的时候,每个partition所对应的Iterable就已经是有序的了,不需要再进行新的排序操作。

总结

  1. 在处理的数据比较大的时候,尽量不要使用groupByKey操作,这个操作的效率很低,可以使用reduceByKey(当需要后续操作的时候)来代替,也可以使用partitionBy,repartition等接口进行替代。
  2. 在处理的数据分区之后,如果还要进行排序操作的话,可以尝试使用repartitionAndSortWithinPartitions,这个接口能够在shuffle数据的同时进行排序,减少遍历数据的次数,节省程序运行时间。

参考资料

  1. https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

最近将Spark集群部署到了Mesos之上,运行Spark程序时,能够方便的在Mesos的sandbox中查看程序运行时产生的stderr和stdout,并且这两个文件可以动态加载,不像在Spark管理界面上需要手动点击load more才能加载出来,十分方便。但是也发现了一个问题,当调试Spark代码时手动kill掉spark-submit的进程,或者Ctrl-c时,spark的程序已经停止了,但是在Mesos的管理界面上有时候还会看到有Job在运行,会占用资源,影响之后提交的程序的资源获取。但是Mesos网页管理界面上又没有像Spark管理界面上的kill按钮,就一时不知道如何kill掉这些实际已经不在运行的Job。在Stack Overflow上查到了几种方法,其中我觉得比较好用的一个贴在这里方便之后查看。

1
curl -XPOST http://mesos-master-ip-address:5050/master/teardown -d 'frameworkId=<frameId-you-want-to-kill>'

这里使用到了Mesos的HTTP Endpoint teardown,有很多人可能会查到是shutdown,但是从Mesos某个版本之后,已经从shutdown改为了teardown,具体的有关HTTP Endpoint的官方文档说明在参考文档当中。

指令发送了一个POST请求给Mesos的一个HTTP Endpoint,要求关闭指定frameworkId的framework。可以将这个指令写在~/.bashrc中,方便之后使用,如下:

1
killmesostask(){ curl -XPOST http://gpu-server5:5050/master/teardown -d 'frameworkId='$@''; } ;

之后就可以使用killmesostask <frameworkId>的方式来kill指定Id的framework,需要注意的是这个指令一定要在mesos的master所在的服务器上运行,如果使用了zookeeper,master有时候会发生变化,需要在每个运行mesos-master进程的机器上都加上这个语句,需要kill的时候在对应那个时刻为master的机器上运行指令。

阅读全文 »

1.安装iTerm2

2.安装oh-my-zsh

1
sh -c "$(curl -fsSL https://raw.githubusercontent.com/robbyrussell/oh-my-zsh/master/tools/install.sh)"
  • 使用该指令能自动切换bash为zsh
  • 将oh-my-zsh的主题切换为ys
  • brew install autojump,在.zshrc中添加plugins中的autojump,并添加[[ -s ~/.autojump/etc/profile.d/autojump.sh ]] && . ~/.autojump/etc/profile. d/autojump.sh

3.安装dircolors-solarized

  • 安装之后能够使得终端中使用ls指令具有彩色的输出
  • brew install coreutils
  • git clone git://github.com/seebi/dircolors-solarized.git,将clone文件夹中的dircolors.ansi-dark复制到~/.dir_colors
  • 在.zshrc中添加如下内容
1
2
3
4
5
if brew list | grep coreutils > /dev/null ; then
PATH="$(brew --prefix coreutils)/libexec/gnubin:$PATH"
alias ls='gls -FHG --color=auto'
eval `gdircolors -b $HOME/.dir_colors`
fi

4.修改vim主题

  • git clone git://github.com/altercation/solarized.git
  • mkdir -p ~/.vim/colors
  • cp solarized/vim-colors-solarized/colors/solarized.vim ~/.vim/colors
  • vim ~/.vimrc,添加如下内容
1
2
3
4
syntax enable
set background=dark
colorscheme solarized
set nu

安装python的library的时候经常需要sudo权限,原因是许多机器上安装的python都是使用apt-get(ubuntu)、yum(CentOS)安装的,在这种情况下python被安装到了root用户目录下,再使用pip安装各种库的时候,有时候库需要被安装到这些root用户目录下,就会出现权限问题。解决这个问题的方法是通过源码安装python到普通用户目录下,再通过源码安装setuptools和pip便能在普通用户下安装python的各种库。

1.编译安装python

下载python源码,https://www.python.org/downloads/source/
这里选择python2.7.13进行测试,https://www.python.org/downloads/release/python-2713/
下载xz压缩格式的源码,使用tar -xvf Python-2.7.13.tar.xz解压后进入目录,./configure --prefix=/home/spark/Softwares/python2通过--prefix指定安装目录为普通用户目录,make进行编译,make install进行安装。之后在~/.bashrc文件内添加PATH指定python的安装路径export PATH=/home/spark/Softwares/python2/bin:$PATH,然后source ~/.bashrc,此时python便被指定为编译安装的python了。

2.编译安装setuptools和pip

下载setuptools源码,https://pypi.python.org/pypi/setuptools

下载zip压缩格式源码,使用unzip setuptools-36.2.7.zip进行解压,进入目录之后使用python setup.py install进行安装,此时需要确保python指令指向的是之前编译安装的python。

下载pip源码,https://pypi.python.org/pypi/pip

下载tar.gz格式的源码,使用tar -zxvf pip-9.0.1.tar.gz进行解压,进入目录后,首先使用python setup.py build进行编译,然后使用python setup.py install进行安装,这之后会发现在编译安装python的路径的bin目录中会有pip和setuptools程序。由于之前已经添加该bin目录到PATH环境变量当中,此时使用pip安装即为普通用户目录下的pip,可以不需要sudo权限安装各种library。

后台断点续传大文件

使用rync指令进行断点续传

1
rsync -P --rsh=ssh slurm@XXX.XXX.XXX.XXX:/path/to/file.tar.gz file.tar.gz

以上指令表示将远端的file.tar.gz文件传输到近端,输入以上指令之后需要输入密码,此时输入密码便开始进行传输。此时传输的程序是运行在前台的,我们需要将该程序转入后台运行。此时使用Ctrl+z挂起该传输进程,再使用jobs指令能够看到挂起的进程的序号以及状态,例如序号为1,接下来使用bg %1便能将这个序号为1的挂起任务重新再后台进行执行。但是此时如果退出登录该传输进程还是会被kill,原因是该进程此时属于这个登录用户,该用户下线之后该进程便会被自动注销关闭,所以需要将该进程的所有者由当前登录的用户移交给root用户,使用disown -h %1便能达到这个效果,此时该进程便能顺利的在后台进行传输了,如果需要中断,只能使用手动查找到传输的进程,使用kill指令进行关闭。

使用ssh协议来进行rsync操作,能够在传输的时候暂停或者因为网络故障等中断传输以后下次可以继续传输,传输过程中,被传输的文件会被存储为一个隐藏文件,文件传输完毕后该隐藏文件会被转换成普通文件。如果传输过程中断,该隐藏文件也会被转换为普通文件,下次继续传输时会再转换为隐藏文件进行传输。

断点传输目录

使用方法基本一致

1
rsync -Pr --rsh=ssh source/dirname slurm@XXX.XXX.XXX.XXX:/home/slurm/dest/dirname

断点续传目录的时候,目录中的文件会被依次以上面的那种方法的形式进行传输,传输如果中断,之前已经上传完毕的文件会被进行完整性检查,如果确认是完整传输的文件,就不会被继续传输了。

创建云主机集群

登录清华大学EasyStack,概况中可以看到目前集群资源的使用情况,点击左侧计算资源-云主机,选择创建云主机对云主机进行创建。填入云主机名字,例如canu,选择从镜像中安装云主机,镜像选择CentOS6.5,之后对配置进行设置,对于canu项目来说建议选择16core-64GB内存-500GB磁盘的配置,网络选择share_net,安全组中将joey安全策略加入,密码设置自己密码即可,之后点击创建云主机。由于本次需要搭建一个集群,我们先完整配置好一台机器再通过该机器建立云主机快照,根据该快照来创建其他节点(即将这台机器一模一样复制多台出来)。主机创建完毕大概需要20分钟左右。

创建完毕之后需要对该主机绑定公网ip,公网ip可以到网络资源中进行申请,选择100Mbps的公网。选中之前创建的那台云主机,在更多种选择绑定公网ip,将之前申请的公网ip绑定上去。之后便可以通过vpn来连接该主机,由于windows连接清华大学vpn比较麻烦,已经在gpu-server5上打开了与清华大学的vpn连接,可以首先连接gpu-server5再连接清华大学集群。

搭建slurm环境

  1. 刚创建的云主机只有root用户,一般用root用户操作有许多不便之处,需要先创建slurm用户来进行操作。(#表示需要sudo权限或root用户的操作)
1
# useradd slurm

之后需要增加slurm用户的sudo权限

1
2
3
# chmod -v u+w /etc/sudoers
# yum install vim git -y
# vim /etc/sudoers

模仿root用户的权限控制加入slurm ALL=(ALL) ALL
之后需要恢复/etc/sudoers的权限,使用# chmod -v u-w /etc/sudoers

接下来su slurm进行操作

阅读全文 »

SparkFlumeProtocol

1
2
Error:(45, 66) not found: type SparkFlumeProtocol
val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {

这个问题是由于flume-sink所需要的部分源文件idea不会自动下载,所以编译时不能通过。

解决方式

在Intellij IDEA里面:

  • 打开View -> Tool Windows -> Maven Projects
  • 右击Spark Project External Flume Sink
  • 点击Generate Sources and Update Folders
    随后,Intellij IDEA会自动下载Flume Sink相关的包

SqlBaseParser

1
2
Error:(36, 45) object SqlBaseParser is not a member of package org.apache.spark.sql.catalyst.parser
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._

这个问题和之前的问题类似,是idea不会自动下载部分catalyst相关的源文件,导致编译时不能通过。

解决办法

在Intellij IDEA里面:

  • 打开View -> Tool Windows -> Maven Projects
  • 右击Spark Project Catalyst
  • 点击Generate Sources and Update Folders
    随后,Intellij IDEA会自动下载Catalyst相关的包

总结

IDEA编译spark源码出错大多都是不能正确下载相关的源文件,在Maven的管理菜单内选择相对应的库选择下载源码并更新即可解决大多数问题。

spark build error集合:
https://www.mail-archive.com/search?l=user@spark.apache.org&q=subject:%22Build+error%22&o=newest&f=1

1. 配置双网卡静态IP信息

sudo vim /etc/sysconfig/network-scripts/ifcfg-enp15s0

配置该网卡为外网IP,需要添加的内容如下

1
2
3
4
5
6
BOOTPROTO=static
ONBOOT=yes
DNS1=202.114.0.242
IPADDR=XXX.XXX.XXX.XXX
PREFIX=23
GATEWAY=115.156.163.254

sudo vim /etc/sysconfig/network-scripts/ifcfg-eno1

配置该网卡转发enp15s0的网络包,需要添加的内容如下

1
2
3
4
5
6
BOOTPROTO=static
ONBOOT=yes
DNS1=202.114.0.242
IPADDR=10.0.0.1
PREFIX=24
GATEWAY=10.0.0.1

2. 配置系统内核实现ipv4转发

sudo vim /etc/sysctl.conf

添加如下内容

1
2
# Controls IP packet forwarding
net.ipv4.ip_forward = 1

如果需要不重启实现ipv4转发功能可以使用如下指令

sysctl -w net.ipv4.ip_forward=1

3. 修改iptables转发规则

运行如下指令

sudo iptables -t nat -A POSTROUTING -s 10.0.0.0/24 -o enp15s0 -j SNAT --to XXX.XXX.XXX.XXX

iptables -A OUTPUT -p tcp --dport 53 -j ACCEPT

iptables -A OUTPUT -p udp --dport 53 -j ACCEPT

保存iptables配置的结果防止重启失效

sudo iptables-save

4. 其他一些关于网络的问题

  • 多网卡机器1号网卡作为内网网段,2号网卡作为外网网段不能上网的问题,可以通过修改默认路由来实现。一般来说,默认路由为1号网卡(如果没有配置的话),设置路由使用route指令,route指令说明如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Usage: route [-nNvee] [-FC] [<AF>]           List kernel routing tables
route [-v] [-FC] {add|del|flush} ... Modify routing table for AF.

route {-h|--help} [<AF>] Detailed usage syntax for specified AF.
route {-V|--version} Display version/author and exit.

-v, --verbose be verbose
-n, --numeric don't resolve names
-e, --extend display other/more information
-F, --fib display Forwarding Information Base (default)
-C, --cache display routing cache instead of FIB

<AF>=Use -4, -6, '-A <af>' or '--<af>'; default: inet
List of possible address families (which support routing):
inet (DARPA Internet) inet6 (IPv6) ax25 (AMPR AX.25)
netrom (AMPR NET/ROM) ipx (Novell IPX) ddp (Appletalk DDP)
x25 (CCITT X.25)

使用如下指令来添加默认路由:

1
sudo route add default gw 10.0.0.1

将默认网关设置为10.0.0.1这个上网的网段即可上网

其他MPI中会用到的函数数据类型:

  • MPI_Barrier - int MPI_Barrier( MPI_Comm comm )阻塞执行到当前位置的处理程序,直到communicator中所有处理程序都到达该位置
  • MPI_Wtime - double MPI_Wtime( void )返回调用这个函数的进程节点从创建开始经过的时间
  • MPI_Type_size - int MPI_Type_size(MPI_Datatype datatype, int *size)返回datatype所占有的字节数
  • MPI_Send - int MPI_Send(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm)MPI发送数据函数,buf为发送缓冲区的起始地址,count为将发送的数据的个数,datatype为发送数据的数据类型,dest为目的进程的标识号,tag为消息标志,comm为通信域,该函数的返回值为MPI_SUCCESS时表示发送成功
  • MPI_Irecv = int MPI_Irecv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status)MPI接受数据函数,buf为接受缓冲区的起始地址,count为最多可接受的数据个数,datatype为接收数据的数据类型,source为接受数据的来源即发送数据的进程的进程标识号,tag为消息标识,与相应的发送操作的表示相匹配,comm为本进程和发送进程所在的通信域,status为返回状态
    阅读全文 »