高性能分布式执行框架——Ray管理

参考资料

  1. Ray论文:Real-Time Machine Learning: The Missing
    Pieces
  2. Ray开发手册:http://ray.readthedocs.io/en/latest/index.html
  3. Ray源代码:https://github.com/ray-project/ray

 

Ray方今还地处实验室阶段,最新版本为0.2.2版本。固然Ray自称是面向AI应用的分布式统计框架,不过它的架构具有通用的分布式总括抽象。本文对Ray举行简要的牵线,援救大家更快地通晓Ray是何许,如有描述不当的地方,欢迎不吝指正。

以上代码代表劳务端定时运算processToServerSent伊夫nt再次来到ServerSent伊芙nt类型结果后公告给所有订阅的客户端。大家用一个函数processToServerSent伊夫nt模拟重复运算的事务功用:

四、安装Ray

借使只是使用Ray,可以采取如下命令直接设置。

pip intall ray

倘诺必要编译Ray的流行源码举办设置,根据如下步骤进行(马克斯OS):

# 更新编译依赖包
brew update
brew install cmake pkg-config automake autoconf libtool boost wget
pip install numpy cloudpickle funcsigs click colorama psutil redis flatbuffers cython --ignore-installed six
# 下载源码编译安装
git clone https://github.com/ray-project/ray.git
cd ray/python
python setup.py install
# 测试
python test/runtest.py

# 安装WebUI需要的库[可选]
pip install jupyter ipywidgets bokeh

# 编译Ray文档[可选]
cd ray/doc
pip install -r requirements-doc.txt
make html
open _build/html/index.html

自己在MacOS上安装jupyter时,境遇了Python的setuptools库不可能擢升的情景,原因是MacOS的安全性设置问题,可以行使如下格局缓解:

  1. 重启电脑,启动时按住Command+R进入Mac保护方式。
  2. 打开命令行,输入指令csrutils disable关闭系统安全策略。
  3. 重启电脑,继续安装jupyter。
  4. 设置到位后,重复如上的法门执行csrutils enable,再度重启即可。

跻身PythonShell,输入代码本地启动Ray:

import ray
ray.init()

浏览器内开辟WebUI界面如下:

管理 1

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding.Get
import akka.http.scaladsl.model.HttpMethods
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.model._

object SSEClient {

  def downloadFiles(file: String) = {
    Thread.sleep(3000)   //process delay
    if (file != "")
      println(s"Try to download $file")
  }

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val mat    = ActorMaterializer()

    import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._
    import system.dispatcher

    Http()
      .singleRequest(Get("http://localhost:8011/events"))
      .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
      .foreach(_.runForeach(se => downloadFiles(se.data)))

    scala.io.StdIn.readLine()
    println("do some thing ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")
    ).onSuccess {
      case msg => println(msg)
    }

    scala.io.StdIn.readLine()
    println("do some other things ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")
    ).onSuccess {
      case msg => println(msg)
    }


    scala.io.StdIn.readLine()
    system.terminate()
  }
}

一、不难先河

先是来看一下最简易的Ray程序是何等编写的。

# 导入ray,并初始化执行环境
import ray
ray.init()

# 定义ray remote函数
@ray.remote
def hello():
    return "Hello world !"

# 异步执行remote函数,返回结果id
object_id = hello.remote()

# 同步获取计算结果
hello = ray.get(object_id)

# 输出计算结果
print hello

在Ray里,通过Python注解@ray.remote定义remote函数。使用此注明注解的函数都会自带一个默许的艺术remote,通过此格局发起的函数调用都是以提交分布式任务的法子异步执行的,函数的重临值是一个目的id,使用ray.get放开操作可以一起获取该id对应的对象。精通Java里的Future机制的话对此应当并不陌生,或许会有人狐疑那和平凡的异步函数调用没什么大的分化,可是这里最大的差距是,函数hello是分布式异步执行的。

remote函数是Ray分布式总计抽象中的大旨概念,通过它开发者拥有了动态定制计算信赖(职分DAG)的能力。比如:

@ray.remote
def A():
    return "A"

@ray.remote
def B():
    return "B"

@ray.remote
def C(a, b):
    return "C"

a_id = A.remote()
b_id = B.remote()
c_id = C.remote(a_id, b_id)
print ray.get(c_id)

事例代码中,对函数A、B的调用是一点一滴并行执行的,可是对函数C的调用依赖于A、B函数的回到结果。Ray可以有限支撑函数C必要等待A、B函数的结果真的总括出来后才会履行。若是将函数A、B、C类比为DAG的节点的话,那么DAG的边就是函数C参数对函数A、B总结结果的看重性,自由的函数调用格局允许Ray可以无限制地定制DAG的构造和总计依赖关系。其它,提及一点的是Python的函数可以定义函数具有七个重返值,那也使得Python的函数更自然具备了DAG节点多入和多出的特征。

管理 2

 我的博客即将联合至腾讯云+社区。邀我们一同入驻http://cloud.tencent.com/developer/support-plan

Ray是UC 伯克利(Berkeley)(Berkeley)RISELab新生产的高性能分布式执行框架,它接纳了和观念分布式计算系统不等同的架构和对分布式计算的抽象格局,具有比斯帕克(Spark)更美好的测算性能。

 

4. @ray.remote

Ray中应用表明@ray.remote可以声雀巢(Nestle)个remote
function。remote函数时Ray的中坚任务调度单元,remote函数定义后会即刻被体系化存储到RedisServer中,并且分配了一个唯一的ID,那样就有限支持了集群的持有节点都得以见到那一个函数的定义。

而是,那样对remote函数定义有了一个诡秘的渴求,即remote函数内即使调用了其他的用户函数,则必须超前定义,否则remote函数无法找到相应的函数定义内容。

remote函数内也足以调用其他的remote函数,Driver和Slave每回调用remote函数时,其实都是向集群提交了一个计量职分,从此间也得以看出Ray的分布式总结的自由性。

Ray中调用remote函数的主要流程如下:

  1. 调用remote函数时,首先会成立一个职务目标,它富含了函数的ID、参数的ID或者值(Python的主旨指标直接传值,复杂对象会先经过ray.put()操作存入ObjectStore然后赶回ObjectID)、函数重回值对象的ID。
  2. 义务目的被发送到本地调度器。
  3. 地点调度器决定职责目的是在地点调度照旧发送给全局调度器。倘诺义务目标的依靠(参数)在地头的ObejctStore已经存在且当地的CPU和GPU总括资源丰盛,那么地点调度器将任务分配给当地的WorkerProcess执行。否则,职分目的被发送给全局调度器并蕴藏到义务表(TaskTable)中,全局调度器依照当前的职责状态新闻决定将职责发给集群中的某一个地面调度器。
  4. 本地调度器收到职分目的后(来自地点的职责依旧全局调度分配的职分),会将其放入一个职分队列中,等待计算资源和地点依赖知足后分配给WorkerProcess执行。
  5. Worker收到任务目标后进行该职务,并将函数重返值存入ObjectStore,并更新Master的目的表(ObjectTable)音信。

@ray.remote诠释有一个参数num_return_vals用以申明remote函数的再次来到值个数,基于此已毕remote函数的多重回值机制。

@ray.remote(num_return_vals=2)
def f():
    return 1, 2

x_id, y_id = f.remote()
ray.get(x_id)  # 1
ray.get(y_id)  # 2

@ray.remote申明的另一个参数num_gpus能够为天职指定GPU的资源。使用内置函数ray.get_gpu_ids()可以获得当前职分可以利用的GPU音讯。

@ray.remote(num_gpus=1)
def gpu_method():
    return "This function is allowed to use GPUs {}.".format(ray.get_gpu_ids())
    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._
         complete {
            Source
              .tick(2.seconds, 2.seconds, NotUsed)
              .map( _ => processToServerSentEvent)
              .keepAlive(1.second, () => ServerSentEvent.heartbeat)
          }

5. ray.wait()

ray.wait()操作匡助批量的义务等待,基于此可以兑现四回性取得三个ObjectID对应的数额。

# 启动5个remote函数调用任务
results = [f.remote(i) for i in range(5)]
# 阻塞等待4个任务完成,超时时间为2.5s
ready_ids, remaining_ids = ray.wait(results, num_returns=4, timeout=2500)

上述例子中,results包蕴了5个ObjectID,使用ray.wait操作可以一贯守候有4个职分成功后赶回,并将不辱任务的多寡对象放在第三个list类型重回值内,未形成的ObjectID放在首个list重返值内。如若设置了晚点时间,那么在逾期时间为止后仍未等到预期的重临值个数,则已过期落成时的重临值为准。

 

2. ray.put()

使用ray.put()可以将Python对象存入本地ObjectStore,并且异步再次回到一个唯一的ObjectID。通过该ID,Ray可以访问集群中任一个节点上的目标(远程对象通过查看Master的靶子表得到)。

目的一旦存入ObjectStore便不可改变,Ray的remote函数可以将直接将该对象的ID作为参数传入。使用ObjectID作为remote函数参数,可以有效地回落函数参数的写ObjectStore的次数。

@ray.remote
def f(x):
    pass

x = "hello"

# 对象x往ObjectStore拷贝里10次
[f.remote(x) for _ in range(10)]

# 对象x仅往ObjectStore拷贝1次
x_id = ray.put(x)
[f.remote(x_id) for _ in range(10)]

服务端:

1. ray.init()

在PythonShell中,使用ray.init()可以在地头启动ray,包含Driver、HeadNode(Master)和多少Slave。

import ray
ray.init()

一经是直连已有些Ray集群,只要求指定RedisServer的地点即可。

ray.init(redis_address="<redis-address>")

当地启动Ray获得的输出如下:

>>> ray.init()
Waiting for redis server at 127.0.0.1:58807 to respond...
Waiting for redis server at 127.0.0.1:23148 to respond...
Allowing the Plasma store to use up to 13.7439GB of memory.
Starting object store with directory /tmp and huge page support disabled
Starting local scheduler with 8 CPUs, 0 GPUs

======================================================================
View the web UI at http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5
======================================================================

{'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store73540254', manager_name='/tmp/plasma_manager78072648', manager_port=39874)], 'redis_address': '127.0.0.1:58807', 'local_scheduler_socket_names': ['/tmp/scheduler98624129'], 'webui_url': 'http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5', 'node_ip_address': '127.0.0.1'}
>>> 

本地启动Ray时,可以看出Ray的WebUI的拜访地址。

下面是客户端程序的测试运算步骤:

三、焦点操作

按照上述架构,我们简要啄磨一下Ray中举足轻重的操作和流程。

 

7. Actor

Ray的remote函数只可以处理无状态的测算需要,有事态的计算必要要求接纳Ray的Actor达成。在Python的class定义前应用@ray.remote可以申明Actor。

@ray.remote
class Counter(object):
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

应用如下格局创立Actor对象。

a1 = Counter.remote()
a2 = Counter.remote()

Ray成立Actor的流程为:

  1. Master选用一个Slave,并将Actor创设职分分发给它的地面调度器。
  2. 创办Actor对象,并举行它的构造函数。

从流水线能够见见,Actor对象的制造时互动的。

经过调用Actor对象的法门应用Actor。

a1.increment.remote()  # ray.get returns 1
a2.increment.remote()  # ray.get returns 1

调用Actor对象的措施的流水线为:

  1. 率先创制一个职分。
  2. 该职责被Driver直接分配到成立该Actor对应的本土执行器执行,那些操作绕开了全局调度器(Worker是不是也足以选用Actor直接分配任务尚存疑问)。
  3. 回来Actor方法调用结果的ObjectID。

为了保险Actor状态的一致性,对同一个Actor的格局调用是串行执行的。

那几个函数模拟揭橥事件数量是某种业务运算结果,在那里代表客户端需求下载文件名称。大家用客户端request来效仿设定这几个文件名称:

3. ray.get()

使用ray.get()可以由此ObjectID获取ObjectStore内的目的并将之转换为Python对象。对于数组类型的靶子,Ray使用共享内存机制缩短数量的正片成本。而对此任何对象则要求将数据从ObjectStore拷贝到进程的堆内存中。

假设调用ray.get()操作时,对象尚未创立好,则get操作会阻塞,直到对象创设完成后回到。get操作的要害流程如下:

  1. Driver或者Worker进程首先到ObjectStore内请求ObjectID对应的靶子数据。
  2. 万一地点ObjectStore没有对号入座的靶子数据,本地对象管理器Plasma会检讨Master上的对象表查看对象是不是存储别的节点的ObjectStore。
  3. 假如目标数据在其它节点的ObjectStore内,Plasma会发送网络请求将对象数据拉到本地ObjectStore。
  4. 要是目的数据还并未创建好,Master会在目的成立落成后文告请求的Plasma读取。
  5. 如果目的数据现已被抱有的ObjectStore移除(被LRU策略删除),本地调度器会按照义务血缘关系执行对象的重复创造工作。
  6. 若果目的数据在本地ObjectStore可用,Driver或者Worker进度会通过共享内存的办法直接将目的内存区域映射到自己的长河地址空间中,并反连串化为Python对象。

另外,ray.get()可以一遍性读取三个目的的数据:

result_ids = [ray.put(i) for i in range(10)]
ray.get(result_ids)  # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

客户端订阅SSE的不二法门如下:

6. ray.error_info()

使用ray.error_info()可以得到职责执行时发出的错误音讯。

>>> import time
>>> @ray.remote
>>> def f():
>>>     time.sleep(5)
>>>     raise Exception("This task failed!!")
>>> f.remote()
Remote function __main__.f failed with:

Traceback (most recent call last):
  File "<stdin>", line 4, in f
Exception: This task failed!!


  You can inspect errors by running

      ray.error_info()

  If this driver is hanging, start a new one with

      ray.init(redis_address="127.0.0.1:65452")
>>> ray.error_info()
[{'type': 'task', 'message': 'Remote function \x1b[31m__main__.f\x1b[39m failed with:\n\nTraceback (most recent call last):\n  File "<stdin>", line 4, in f\nException: This task failed!!\n', 'data': '{\'function_id\': "Hm\\xde\\x93\'\\x91\\xce\\x13ld\\xf4O\\xd7\\xce\\xc2\\xe1\\x151\\x1e3", \'function_name\': u\'__main__.f\'}'}]

 

二、系统架构

Ray是应用什么的架构对分布式统计做出如上抽象的啊,一下交给了Ray的连串架构(来自Ray随想,参考文献1)。

管理 3

用作分布式统计系统,Ray依旧根据了非凡的Master-Slave的布署:Master负责全局协调和景色维护,Slave执行分布式总结职分。不过和价值观的分布式计算系统分化的是,Ray使用了掺杂职分调度的思绪。在集群陈设形式下,Ray启动了以下重点零部件:

  1. GlobalScheduler:Master上启动了一个大局调度器,用于吸纳本地调度器提交的义务,并将职分分发给方便的地面义务调度器执行。
  2. RedisServer:Master上启动了一到多少个RedisServer用于保存分布式职务的情事音讯(ControlState),包罗对象机器的照耀、任务描述、职分debug音讯等。
  3. LocalScheduler:每个Slave上启动了一个本土调度器,用于提交义务到全局调度器,以及分配任务给当下机械的Worker进度。
  4. Worker:每个Slave上可以启动多个Worker进度执行分布式义务,并将总计结果存储到ObjectStore。
  5. ObjectStore:每个Slave上启动了一个ObjectStore存储只读数据对象,Worker可以通过共享内存的方法访问这一个目标数据,那样可以使得地压缩内存拷贝和目标体系化花费。ObjectStore底层由Apache
    Arrow完成。
  6. Plasma:每个Slave上的ObjectStore都由一个名为Plasma的靶子管理器进行保管,它可以在Worker访问本地ObjectStore上不设有的长途数据对象时,主动拉取其余Slave上的目的数据到当前机械。

须求表达的是,Ray的舆论中提及,全局调度器可以启动一到七个,而眼前Ray的兑现文档里研究的内容都是依据一个大局调度器的情景。我揣测可能是Ray尚在建设中,一些体制还未周密,后续读者可以小心此处的底细变化。

Ray的义务也是经过类似斯帕克中Driver的概念的办法展开提交的,有所差别的是:

  1. 斯帕克的Driver提交的是天职DAG,一旦付出则不得更改。
  2. 而Ray提交的是更细粒度的remote
    function,义务DAG看重关系由函数依赖关系自由定制。

舆论给出的架构图里没有画出Driver的定义,由此我在其基础上做了有的改动和扩张。

管理 4

Ray的Driver节点和和Slave节点启动的机件大致同一,然则却有以下分别:

  1. Driver上的行事进程DriverProcess一般惟有一个,即用户启动的PythonShell。Slave可以依照要求创建多少个WorkerProcess。
  2. Driver只可以交给职分,却不可以收到来自全局调度器分配的职责。Slave可以交到职分,也可以吸纳全局调度器分配的义务。
  3. Driver能够主动绕过全局调度器给Slave发送Actor调用职分(此处设计是或不是合理尚不商量)。Slave只好接受全局调度器分配的测算任务。

 

 

运算结果:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.concurrent.duration.DurationInt
import akka.http.scaladsl.model.sse.ServerSentEvent

object SSEServer {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val mat    = ActorMaterializer()
    Http().bindAndHandle(route, "localhost", 8011)

    scala.io.StdIn.readLine()
    system.terminate()
  }

  object SyncFiles {
    var fileToSync: String = ""
  }
  private def route = {
    import Directives._
    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

    def syncRequests =
      pathPrefix("sync") {
        pathSingleSlash {
        post {
            parameter("file") { filename =>
              complete {
                SyncFiles.fileToSync = filename
                s"set download file to : $filename"
              }
            }
          }
        }
      }

    def events =
      path("events") {
        get {
          complete {
            Source
              .tick(2.seconds, 2.seconds, NotUsed)
              .map( _ => processToServerSentEvent)
              .keepAlive(1.second, () => ServerSentEvent.heartbeat)
          }
        }
      }

    syncRequests ~ events
  }

  private def processToServerSentEvent: ServerSentEvent = {
    Thread.sleep(3000)   //processing delay
    ServerSentEvent(SyncFiles.fileToSync)
  }
}

服务端的SSE发表是以Source[ServerSentEvent,NotUsed]来落到实处的。ServerSent伊芙(Eve)nt类型定义如下:

  def downloadFiles(file: String) = {
    Thread.sleep(3000)   //process delay
    if (file != "")
      println(s"Try to download $file")
  }

以此类型的参数代表事件音讯的数据结构。用户可以根据实际须求丰盛利用这些数据结构来传递音讯。服务端是经过complete以SeverSent伊芙nt类为因素的Source来举行SSE的,如下:

    import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._
    import system.dispatcher

    Http()
      .singleRequest(Get("http://localhost:8011/events"))
      .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
      .foreach(_.runForeach(se => downloadFiles(se.data)))
do some thing ...
HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:50:52 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Orders),HttpProtocol(HTTP/1.1))
Try to download Orders
Try to download Orders

do some other things ...
HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:51:02 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Items),HttpProtocol(HTTP/1.1))
Try to download Orders
Try to download Orders
Try to download Items
Try to download Items

Try to download Items

Process finished with exit code 0

 

 

每当客户端收到SSE后即运行downloadFiles(filename)函数。downloadFiles函数定义:

 
 因为自身询问Akka-http的要紧目的不是为着有关Web-Server的编程,而是想完成一套系统融为一体的api,所以也亟需考虑由服务端主动向客户端发送指令的行使场景。比如一个零售店管理平台的服务端在成功了一点数据更新后需求通告各零售门市客户端下载最新数据。即便Akka-http也提供对websocket探讨的辅助,但websocket的网络连接是双向恒久的,适合频仍的问答交互式服务端与客户端的沟通,音讯结构也正如零碎。而我辈面临的或者是批次型的恢宏数据库数据互换,只须要不难的劳动端单向音讯就行了,所以websocket不太适宜,而Akka-http的SSE应该比较吻合我们的须求。SSE情势的基本原理是服务端统一集中表露音信,各客户端持久订阅服务端发表的音信并从信息的情节中筛选出属于自己应有履行的一声令下,然后举行相应的拍卖。客户端接收SSE是在一个独门的线程里持续拓展的,不会影响客户端当前的运算流程。当接过有效的消息后就会调用一个事情功用函数作为后台异步运算职分。

客户端:

  object SyncFiles {
    var fileToSync: String = ""
  }
  private def route = {
    import Directives._
    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

    def syncRequests =
      pathPrefix("sync") {
        pathSingleSlash {
        post {
            parameter("file") { filename =>
              complete {
                SyncFiles.fileToSync = filename
                s"set download file to : $filename"
              }
            }
          }
        }
      }
    scala.io.StdIn.readLine()
    println("do some thing ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")
    ).onSuccess {
      case msg => println(msg)
    }

    scala.io.StdIn.readLine()
    println("do some other things ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")
    ).onSuccess {
      case msg => println(msg)
    }

 

上边是本次探讨的示范源代码:

  private def processToServerSentEvent: ServerSentEvent = {
    Thread.sleep(3000)   //processing delay
    ServerSentEvent(SyncFiles.fileToSync)
  }

 

/**
 * Representation of a server-sent event. According to the specification, an empty data field designates an event
 * which is to be ignored which is useful for heartbeats.
 *
 * @param data data, may span multiple lines
 * @param eventType optional type, must not contain \n or \r
 * @param id optional id, must not contain \n or \r
 * @param retry optional reconnection delay in milliseconds
 */
final case class ServerSentEvent(
  data:      String,
  eventType: Option[String] = None,
  id:        Option[String] = None,
  retry:     Option[Int]    = None) {...}

Post Author: admin

发表评论

电子邮件地址不会被公开。 必填项已用*标注