跳到主要内容

MoonBit pearls vol.4 用 MoonBit 探索协同式编程(下篇)

· 阅读需 12 分钟

本文旨在使用 MoonBit 语言的协同式编程库 moonchor,用多个例子阐释协同式编程的核心思想和基本用法。上篇文章中我们提到了如何通过一个书店应用展示moonbit在协同式编程里的实践。

案例研究:多副本 KVStore

在本节中,我们将探讨一个更复杂的案例,使用 moonchor 实现多副本的 KVStore。我们依然只使用 moonchor 的核心 API,但会充分利用 MoonBit 的泛型和一等公民函数这两个特性。我们的目的是探索 MoonBit 的强大表达能力可以为协同式编程的带来多大的可能性。

基本实现

首先做一些准备工作,定义客户端 Client 和服务器 Server 两个角色:

struct Server {} derive(
trait Eq {
  op_equal(Self, Self) -> Bool
}

Trait for types whose elements can test for equality

Eq
,
trait Hash {
  hash_combine(Self, Hasher) -> Unit
  hash(Self) -> Int
}

Trait for types that can be hashed

Hash
,
trait Show {
  output(Self, &Logger) -> Unit
  to_string(Self) -> String
}

Trait for types that can be converted to String

Show
)
struct Client {} derive(
trait Eq {
  op_equal(Self, Self) -> Bool
}

Trait for types whose elements can test for equality

Eq
,
trait Hash {
  hash_combine(Self, Hasher) -> Unit
  hash(Self) -> Int
}

Trait for types that can be hashed

Hash
,
trait Show {
  output(Self, &Logger) -> Unit
  to_string(Self) -> String
}

Trait for types that can be converted to String

Show
)
impl @moonchor.Location for
struct Server {
}
Server
with
(_/0) -> String
name
(_) {
"server" } impl @moonchor.Location for
struct Client {
}
Client
with
(_/0) -> String
name
(_) {
"client" } let
Server
server
:
struct Server {
}
Server
=
struct Server {
}
Server
::{ }
let
Client
client
:
struct Client {
}
Client
=
struct Client {
}
Client
::{ }

要实现一个 KVStore,例如 Redis,我们需要实现最基本的两个接口:get 和 put(对应 Redis 的 get 和 set)。最简单的实现就是用一个 Map 数据结构来存储键值对:

struct ServerState {
  
Map[String, Int]
db
:
type Map[K, V]

Mutable linked hash map that maintains the order of insertion, not thread safe.

Example

  let map = { 3: "three", 8 :  "eight", 1 :  "one"}
  assert_eq(map.get(2), None)
  assert_eq(map.get(3), Some("three"))
  map.set(3, "updated")
  assert_eq(map.get(3), Some("updated"))
Map
[
String
String
,
Int
Int
]
} fn
struct ServerState {
  db: Map[String, Int]
}
ServerState
::
() -> ServerState
new
() ->
struct ServerState {
  db: Map[String, Int]
}
ServerState
{
{
Map[String, Int]
db
: {} }
}

对于 KVStore 而言,get 和 put 请求是客户端通过网络发送过来的,在接收到请求前,我们并不知道具体的请求是什么。所以我们需要定义一个请求类型 Request,它包含了请求的类型和参数:

enum Request {
  
(String) -> Request
Get
(
String
String
)
(String, Int) -> Request
Put
(
String
String
,
Int
Int
)
} derive(
trait ToJson {
  to_json(Self) -> Json
}

Trait for types that can be converted to Json

ToJson
,
trait @json.FromJson {
  from_json(Json, @json.JsonPath) -> Self raise @json.JsonDecodeError
}

Trait for types that can be converted from Json

FromJson
)

为了方便,我们的 KVStore 只支持 String 类型的键和 Int 类型的值。接下来,我们定义一个 Response 类型,用于表示服务器对请求的响应:

typealias 
Int
Int
? as Response

响应是一个可选的整数。当请求是 Put 时,响应是 None;当请求是 Get 时,响应是键对应的值包裹上一个 Some,如果键不存在,则响应为 None

fn 
(state : ServerState, request : Request) -> Int?
handle_request
(
ServerState
state
:
struct ServerState {
  db: Map[String, Int]
}
ServerState
,
Request
request
:
enum Request {
  Get(String)
  Put(String, Int)
}
Request
) ->
enum Option[A] {
  None
  Some(A)
}
Response
{
match
Request
request
{
Request::
(String) -> Request
Get
(
String
key
) =>
ServerState
state
.
Map[String, Int]
db
.
(self : Map[String, Int], key : String) -> Int?

Get the value associated with a key.

get
(
String
key
)
Request::
(String, Int) -> Request
Put
(
String
key
,
Int
value
) => {
ServerState
state
.
Map[String, Int]
db
(Map[String, Int], String, Int) -> Unit
[
key] =
Int
value
Int?
None
} } }

我们的目标是定义两个函数 putget 模拟客户端发起请求的过程。它们要做的事情分别是:

  1. 在 Client 处生成请求,包装键值对;
  2. 将请求发送给 Server;
  3. Server 使用 handle_request 函数处理请求;
  4. 将响应发送回 Client。

可以看到,putget 函数的逻辑是相似的,我们可以把 2、3、4 三个过程抽象成一个函数,叫作 access_server

async fn 
async (ctx : ?, state_at_server : ?, key : String, value : Int) -> Unit
put_v1
(
?
ctx
: @moonchor.ChoreoContext,
?
state_at_server
: @moonchor.Located[
struct ServerState {
  db: Map[String, Int]
}
ServerState
,
struct Server {
}
Server
],
String
key
:
String
String
,
Int
value
:
Int
Int
) ->
Unit
Unit
{
let
?
request
=
?
ctx
.
(Client, (Unit) -> Request) -> ?
locally
(
Client
client
,
Unit
_unwrapper
=> Request::
(String, Int) -> Request
Put
(
String
key
,
Int
value
))
async (ctx : ?, request : ?, state_at_server : ?) -> ?
access_server_v1
(
?
ctx
,
?
request
,
?
state_at_server
) |>
(t : ?) -> Unit

Evaluates an expression and discards its result. This is useful when you want to execute an expression for its side effects but don't care about its return value, or when you want to explicitly indicate that a value is intentionally unused.

Parameters:

  • value : The value to be ignored. Can be of any type.

Example:

  let x = 42
  ignore(x) // Explicitly ignore the value
  let mut sum = 0
  ignore([1, 2, 3].iter().each((x) => { sum = sum + x })) // Ignore the Unit return value of each()
ignore
} async fn
async (ctx : ?, state_at_server : ?, key : String) -> ?
get_v1
(
?
ctx
: @moonchor.ChoreoContext,
?
state_at_server
: @moonchor.Located[
struct ServerState {
  db: Map[String, Int]
}
ServerState
,
struct Server {
}
Server
],
String
key
:
String
String
) -> @moonchor.Located[
enum Option[A] {
  None
  Some(A)
}
Response
,
struct Client {
}
Client
] {
let
?
request
=
?
ctx
.
(Client, (Unit) -> Request) -> ?
locally
(
Client
client
,
Unit
_unwrapper
=> Request::
(String) -> Request
Get
(
String
key
))
async (ctx : ?, request : ?, state_at_server : ?) -> ?
access_server_v1
(
?
ctx
,
?
request
,
?
state_at_server
)
} async fn
async (ctx : ?, request : ?, state_at_server : ?) -> ?
access_server_v1
(
?
ctx
: @moonchor.ChoreoContext,
?
request
: @moonchor.Located[
enum Request {
  Get(String)
  Put(String, Int)
}
Request
,
struct Client {
}
Client
],
?
state_at_server
: @moonchor.Located[
struct ServerState {
  db: Map[String, Int]
}
ServerState
,
struct Server {
}
Server
]
) -> @moonchor.Located[
enum Option[A] {
  None
  Some(A)
}
Response
,
struct Client {
}
Client
] {
let
Unit
request_at_server
=
?
ctx
.
(Client, Server, ?) -> Unit
comm
(
Client
client
,
Server
server
,
?
request
)
let
Unit
response
=
?
ctx
.
(Server, (Unit) -> Int?) -> Unit
locally
(
Server
server
, fn(
Unit
unwrapper
) {
let
Request
request
=
Unit
unwrapper
.
(Unit) -> Request
unwrap
(
Unit
request_at_server
)
let
ServerState
state
=
Unit
unwrapper
.
(?) -> ServerState
unwrap
(
?
state_at_server
)
(state : ServerState, request : Request) -> Int?
handle_request
(
ServerState
state
,
Request
request
)
})
?
ctx
.
(Server, Client, Unit) -> ?
comm
(
Server
server
,
Client
client
,
Unit
response
)
}

这样我们的 KVStore 就完成了。我们可以写一个简单的 choreography 来测试它:

async fn 
async (ctx : ?) -> Unit
kvstore_v1
(
?
ctx
: @moonchor.ChoreoContext) ->
Unit
Unit
{
let
?
state_at_server
=
?
ctx
.
(Server, (Unit) -> ServerState) -> ?
locally
(
Server
server
,
Unit
_unwrapper
=>
struct ServerState {
  db: Map[String, Int]
}
ServerState
::
() -> ServerState
new
())
async (ctx : ?, state_at_server : ?, key : String, value : Int) -> Unit
put_v1
(
?
ctx
,
?
state_at_server
, "key1", 42)
async (ctx : ?, state_at_server : ?, key : String, value : Int) -> Unit
put_v1
(
?
ctx
,
?
state_at_server
, "key2", 41)
let
?
v1_at_client
=
async (ctx : ?, state_at_server : ?, key : String) -> ?
get_v1
(
?
ctx
,
?
state_at_server
, "key1")
let
?
v2_at_client
=
async (ctx : ?, state_at_server : ?, key : String) -> ?
get_v1
(
?
ctx
,
?
state_at_server
, "key2")
?
ctx
.
(Client, (Unit) -> Unit) -> Unit
locally
(
Client
client
, fn(
Unit
unwrapper
) {
let
Int
v1
=
Unit
unwrapper
.
(?) -> Unit
unwrap
(
?
v1_at_client
).
() -> Int
unwrap
()
let
Int
v2
=
Unit
unwrapper
.
(?) -> Unit
unwrap
(
?
v2_at_client
).
() -> Int
unwrap
()
if
Int
v1
(self : Int, other : Int) -> Int

Adds two 32-bit signed integers. Performs two's complement arithmetic, which means the operation will wrap around if the result exceeds the range of a 32-bit integer.

Parameters:

  • self : The first integer operand.
  • other : The second integer operand.

Returns a new integer that is the sum of the two operands. If the mathematical sum exceeds the range of a 32-bit integer (-2,147,483,648 to 2,147,483,647), the result wraps around according to two's complement rules.

Example:

  inspect(42 + 1, content="43")
  inspect(2147483647 + 1, content="-2147483648") // Overflow wraps around to minimum value
+
Int
v2
(self : Int, other : Int) -> Bool

Compares two integers for equality.

Parameters:

  • self : The first integer to compare.
  • other : The second integer to compare.

Returns true if both integers have the same value, false otherwise.

Example:

  inspect(42 == 42, content="true")
  inspect(42 == -42, content="false")
==
83 {
(input : String) -> Unit

Prints any value that implements the Show trait to the standard output, followed by a newline.

Parameters:

  • value : The value to be printed. Must implement the Show trait.

Example:

  println(42)
  println("Hello, World!")
  println([1, 2, 3])
println
("The server is working correctly")
} else {
() -> Unit
panic
()
} }) |>
(t : Unit) -> Unit

Evaluates an expression and discards its result. This is useful when you want to execute an expression for its side effects but don't care about its return value, or when you want to explicitly indicate that a value is intentionally unused.

Parameters:

  • value : The value to be ignored. Can be of any type.

Example:

  let x = 42
  ignore(x) // Explicitly ignore the value
  let mut sum = 0
  ignore([1, 2, 3].iter().each((x) => { sum = sum + x })) // Ignore the Unit return value of each()
ignore
} test "kvstore v1" { let
Unit
backend
=
(Array[Server]) -> Unit
@moonchor.make_local_backend
([
Server
server
,
Client
client
])
(() -> Unit) -> Unit
@toolkit.run_async
(() =>
(Unit, async (?) -> Unit, Server) -> Unit
@moonchor.run_choreo
(
Unit
backend
,
async (ctx : ?) -> Unit
kvstore_v1
,
Server
server
))
(() -> Unit) -> Unit
@toolkit.run_async
(() =>
(Unit, async (?) -> Unit, Client) -> Unit
@moonchor.run_choreo
(
Unit
backend
,
async (ctx : ?) -> Unit
kvstore_v1
,
Client
client
))
}

这个程序的含义是,分别在 "key1" 和 "key2" 存储两个数字 42 和 41,然后从服务器获取这两个值并检查它们的和是否等于 83。如果有任何一个请求返回 None 或者计算结果不是 83,程序就会 panic。

双副本

现在,考虑为 KVStore 增加容错功能。最简单的容错就是构建一个从副本,它与主副本存有相同的数据,并在处理 Get 请求时检查主从数据的一致性。

我们为从副本构建一个新的角色:

struct Backup {} derive(
trait Eq {
  op_equal(Self, Self) -> Bool
}

Trait for types whose elements can test for equality

Eq
,
trait Hash {
  hash_combine(Self, Hasher) -> Unit
  hash(Self) -> Int
}

Trait for types that can be hashed

Hash
,
trait Show {
  output(Self, &Logger) -> Unit
  to_string(Self) -> String
}

Trait for types that can be converted to String

Show
)
impl @moonchor.Location for
struct Backup {
}
Backup
with
(_/0) -> String
name
(_) {
"backup" } let
Backup
backup
:
struct Backup {
}
Backup
=
struct Backup {
}
Backup
::{ }

定义一个函数用于检查一致性:这个函数会检查所有副本的响应是否一致,如果不一致,则 panic。

fn 
(responses : Array[Int?]) -> Unit
check_consistency
(
Array[Int?]
responses
:
type Array[T]

An Array is a collection of values that supports random access and can grow in size.

Array
[
enum Option[A] {
  None
  Some(A)
}
Response
]) ->
Unit
Unit
{
match
Array[Int?]
responses
.
(self : Array[Int?]) -> Int??

Removes the last element from a array and returns it, or None if it is empty.

Example

  let v = [1, 2, 3]
  assert_eq(v.pop(), Some(3))
  assert_eq(v, [1, 2])
pop
() {
Int??
None
=> return
(Int?) -> Int??
Some
(
Int?
f
) =>
for
Int?
res
in
Array[Int?]
responses
{
if
Int?
res
(x : Int?, y : Int?) -> Bool
!=
Int?
f
{
() -> Unit
panic
()
} } } }

其余的大部分内容都不需要修改,只要在 access_server 函数中增加对副本的处理即可。新的 access_server_v2 的逻辑是,Server 接收到请求后,将请求转发给 Backup;然后 Server 和 Backup 分别处理请求;Backup 处理完请求后发回给 Server,Server 对两个结果进行一致性检验。

async fn 
async (ctx : ?, state_at_server : ?, state_at_backup : ?, key : String, value : Int) -> Unit
put_v2
(
?
ctx
: @moonchor.ChoreoContext,
?
state_at_server
: @moonchor.Located[
struct ServerState {
  db: Map[String, Int]
}
ServerState
,
struct Server {
}
Server
],
?
state_at_backup
: @moonchor.Located[
struct ServerState {
  db: Map[String, Int]
}
ServerState
,
struct Backup {
}
Backup
],
String
key
:
String
String
,
Int
value
:
Int
Int
) ->
Unit
Unit
{
let
?
request
=
?
ctx
.
(Client, (Unit) -> Request) -> ?
locally
(
Client
client
,
Unit
_unwrapper
=> Request::
(String, Int) -> Request
Put
(
String
key
,
Int
value
))
async (ctx : ?, request : ?, state_at_server : ?, state_at_backup : ?) -> ?
access_server_v2
(
?
ctx
,
?
request
,
?
state_at_server
,
?
state_at_backup
) |>
(t : ?) -> Unit

Evaluates an expression and discards its result. This is useful when you want to execute an expression for its side effects but don't care about its return value, or when you want to explicitly indicate that a value is intentionally unused.

Parameters:

  • value : The value to be ignored. Can be of any type.

Example:

  let x = 42
  ignore(x) // Explicitly ignore the value
  let mut sum = 0
  ignore([1, 2, 3].iter().each((x) => { sum = sum + x })) // Ignore the Unit return value of each()
ignore
} async fn
async (ctx : ?, state_at_server : ?, state_at_backup : ?, key : String) -> ?
get_v2
(
?
ctx
: @moonchor.ChoreoContext,
?
state_at_server
: @moonchor.Located[
struct ServerState {
  db: Map[String, Int]
}
ServerState
,
struct Server {
}
Server
],
?
state_at_backup
: @moonchor.Located[
struct ServerState {
  db: Map[String, Int]
}
ServerState
,
struct Backup {
}
Backup
],
String
key
:
String
String
) -> @moonchor.Located[
enum Option[A] {
  None
  Some(A)
}
Response
,
struct Client {
}
Client
] {
let
?
request
=
?
ctx
.
(Client, (Unit) -> Request) -> ?
locally
(
Client
client
,
Unit
_unwrapper
=> Request::
(String) -> Request
Get
(
String
key
))
async (ctx : ?, request : ?, state_at_server : ?, state_at_backup : ?) -> ?
access_server_v2
(
?
ctx
,
?
request
,
?
state_at_server
,
?
state_at_backup
)
} async fn
async (ctx : ?, request : ?, state_at_server : ?, state_at_backup : ?) -> ?
access_server_v2
(
?
ctx
: @moonchor.ChoreoContext,
?
request
: @moonchor.Located[
enum Request {
  Get(String)
  Put(String, Int)
}
Request
,
struct Client {
}
Client
],
?
state_at_server
: @moonchor.Located[
struct ServerState {
  db: Map[String, Int]
}
ServerState
,
struct Server {
}
Server
],
?
state_at_backup
: @moonchor.Located[
struct ServerState {
  db: Map[String, Int]
}
ServerState
,
struct Backup {
}
Backup
]
) -> @moonchor.Located[
enum Option[A] {
  None
  Some(A)
}
Response
,
struct Client {
}
Client
] {
let
Unit
request_at_server
=
?
ctx
.
(Client, Server, ?) -> Unit
comm
(
Client
client
,
Server
server
,
?
request
)
let
Unit
request_at_backup
=
?
ctx
.
(Server, Backup, Unit) -> Unit
comm
(
Server
server
,
Backup
backup
,
Unit
request_at_server
)
let
Unit
response_at_backup
=
?
ctx
.
(Backup, (Unit) -> Int?) -> Unit
locally
(
Backup
backup
, fn(
Unit
unwrapper
) {
let
Request
request
=
Unit
unwrapper
.
(Unit) -> Request
unwrap
(
Unit
request_at_backup
)
let
ServerState
state
=
Unit
unwrapper
.
(?) -> ServerState
unwrap
(
?
state_at_backup
)
(state : ServerState, request : Request) -> Int?
handle_request
(
ServerState
state
,
Request
request
)
}) let
Unit
backup_response_at_server
=
?
ctx
.
(Backup, Server, Unit) -> Unit
comm
(
Backup
backup
,
Server
server
,
Unit
response_at_backup
)
let
Unit
response_at_server
=
?
ctx
.
(Server, (Unit) -> Int?) -> Unit
locally
(
Server
server
, fn(
Unit
unwrapper
) {
let
Request
request
=
Unit
unwrapper
.
(Unit) -> Request
unwrap
(
Unit
request_at_server
)
let
ServerState
state
=
Unit
unwrapper
.
(?) -> ServerState
unwrap
(
?
state_at_server
)
let
Int?
response
=
(state : ServerState, request : Request) -> Int?
handle_request
(
ServerState
state
,
Request
request
)
let
Int?
backup_response
=
Unit
unwrapper
.
(Unit) -> Int?
unwrap
(
Unit
backup_response_at_server
)
(responses : Array[Int?]) -> Unit
check_consistency
([
Int?
response
,
Int?
backup_response
])
Int?
response
})
?
ctx
.
(Server, Client, Unit) -> ?
comm
(
Server
server
,
Client
client
,
Unit
response_at_server
)
}

和刚才一样,我们可以写一个简单的 choreography 来测试它:

async fn 
async (ctx : ?) -> Unit
kvstore_v2
(
?
ctx
: @moonchor.ChoreoContext) ->
Unit
Unit
{
let
?
state_at_server
=
?
ctx
.
(Server, (Unit) -> ServerState) -> ?
locally
(
Server
server
,
Unit
_unwrapper
=>
struct ServerState {
  db: Map[String, Int]
}
ServerState
::
() -> ServerState
new
())
let
?
state_at_backup
=
?
ctx
.
(Backup, (Unit) -> ServerState) -> ?
locally
(
Backup
backup
,
Unit
_unwrapper
=>
struct ServerState {
  db: Map[String, Int]
}
ServerState
::
() -> ServerState
new
())
async (ctx : ?, state_at_server : ?, state_at_backup : ?, key : String, value : Int) -> Unit
put_v2
(
?
ctx
,
?
state_at_server
,
?
state_at_backup
, "key1", 42)
async (ctx : ?, state_at_server : ?, state_at_backup : ?, key : String, value : Int) -> Unit
put_v2
(
?
ctx
,
?
state_at_server
,
?
state_at_backup
, "key2", 41)
let
?
v1_at_client
=
async (ctx : ?, state_at_server : ?, state_at_backup : ?, key : String) -> ?
get_v2
(
?
ctx
,
?
state_at_server
,
?
state_at_backup
, "key1")
let
?
v2_at_client
=
async (ctx : ?, state_at_server : ?, state_at_backup : ?, key : String) -> ?
get_v2
(
?
ctx
,
?
state_at_server
,
?
state_at_backup
, "key2")
?
ctx
.
(Client, (Unit) -> Unit) -> Unit
locally
(
Client
client
, fn(
Unit
unwrapper
) {
let
Int
v1
=
Unit
unwrapper
.
(?) -> Unit
unwrap
(
?
v1_at_client
).
() -> Int
unwrap
()
let
Int
v2
=
Unit
unwrapper
.
(?) -> Unit
unwrap
(
?
v2_at_client
).
() -> Int
unwrap
()
if
Int
v1
(self : Int, other : Int) -> Int

Adds two 32-bit signed integers. Performs two's complement arithmetic, which means the operation will wrap around if the result exceeds the range of a 32-bit integer.

Parameters:

  • self : The first integer operand.
  • other : The second integer operand.

Returns a new integer that is the sum of the two operands. If the mathematical sum exceeds the range of a 32-bit integer (-2,147,483,648 to 2,147,483,647), the result wraps around according to two's complement rules.

Example:

  inspect(42 + 1, content="43")
  inspect(2147483647 + 1, content="-2147483648") // Overflow wraps around to minimum value
+
Int
v2
(self : Int, other : Int) -> Bool

Compares two integers for equality.

Parameters:

  • self : The first integer to compare.
  • other : The second integer to compare.

Returns true if both integers have the same value, false otherwise.

Example:

  inspect(42 == 42, content="true")
  inspect(42 == -42, content="false")
==
83 {
(input : String) -> Unit

Prints any value that implements the Show trait to the standard output, followed by a newline.

Parameters:

  • value : The value to be printed. Must implement the Show trait.

Example:

  println(42)
  println("Hello, World!")
  println([1, 2, 3])
println
("The server is working correctly")
} else {
() -> Unit
panic
()
} }) |>
(t : Unit) -> Unit

Evaluates an expression and discards its result. This is useful when you want to execute an expression for its side effects but don't care about its return value, or when you want to explicitly indicate that a value is intentionally unused.

Parameters:

  • value : The value to be ignored. Can be of any type.

Example:

  let x = 42
  ignore(x) // Explicitly ignore the value
  let mut sum = 0
  ignore([1, 2, 3].iter().each((x) => { sum = sum + x })) // Ignore the Unit return value of each()
ignore
} test "kvstore 2.0" { let
Unit
backend
=
(Array[Server]) -> Unit
@moonchor.make_local_backend
([
Server
server
,
Client
client
,
Backup
backup
])
(() -> Unit) -> Unit
@toolkit.run_async
(() =>
(Unit, async (?) -> Unit, Server) -> Unit
@moonchor.run_choreo
(
Unit
backend
,
async (ctx : ?) -> Unit
kvstore_v2
,
Server
server
) )
(() -> Unit) -> Unit
@toolkit.run_async
(() =>
(Unit, async (?) -> Unit, Client) -> Unit
@moonchor.run_choreo
(
Unit
backend
,
async (ctx : ?) -> Unit
kvstore_v2
,
Client
client
) )
(() -> Unit) -> Unit
@toolkit.run_async
(() =>
(Unit, async (?) -> Unit, Backup) -> Unit
@moonchor.run_choreo
(
Unit
backend
,
async (ctx : ?) -> Unit
kvstore_v2
,
Backup
backup
) )
}

利用高阶函数抽象复制策略

在双副本实现过程中,出现了一些耦合的代码:Server 处理请求、备份请求、检查结果一致性的代码放在了一起。

利用 MoonBit 的高阶函数特性,我们可以把复制策略从具体处理过程中抽象出来。我们分析一下什么是复制策略。复制策略应该包含一个过程,即服务器拿到请求后如何利用各个副本处理它的方式。关键在于,复制策略本身是和请求无关的,应该被从具体请求处理过程中剥离出来。这样的话,我们就能让复制策略成为可替换的部分,便于日后能轻易地在不同的复制策略之间进行切换,或者实现新的复制策略。

当然,真实世界的复制策略是非常复杂的,往往很难清晰地从处理流程中剥离出来。在这个例子中,我们为了简化问题,专注于 moonchor 的编程能力,直接将复制策略定义为 Server 在接收到请求后决定如何处理请求的函数。我们可以用一个类型别名来定义它:

typealias async (@moonchor.ChoreoContext, @moonchor.Located[
enum Request {
  Get(String)
  Put(String, Int)
}
Request
,
struct Server {
}
Server
]) -> @moonchor.Located[
enum Option[A] {
  None
  Some(A)
}
Response
,
struct Server {
}
Server
,
] as ReplicationStrategy

接下来,我们就可以简化 access_server 的实现了。我们将策略作为参数传递进去:

async fn 
async (ctx : ?, request : ?, strategy : async (?, ?) -> ?) -> ?
access_server_v3
(
?
ctx
: @moonchor.ChoreoContext,
?
request
: @moonchor.Located[
enum Request {
  Get(String)
  Put(String, Int)
}
Request
,
struct Client {
}
Client
],
async (?, ?) -> ?
strategy
: ReplicationStrategy
) -> @moonchor.Located[
enum Option[A] {
  None
  Some(A)
}
Response
,
struct Client {
}
Client
] {
let
?
request_at_server
=
?
ctx
.
(Client, Server, ?) -> ?
comm
(
Client
client
,
Server
server
,
?
request
)
let
?
response
=
async (?, ?) -> ?
strategy
(
?
ctx
,
?
request_at_server
)
?
ctx
.
(Server, Client, ?) -> ?
comm
(
Server
server
,
Client
client
,
?
response
)
} async fn
async (ctx : ?, strategy : async (?, ?) -> ?, key : String, value : Int) -> Unit
put_v3
(
?
ctx
: @moonchor.ChoreoContext,
async (?, ?) -> ?
strategy
: ReplicationStrategy,
String
key
:
String
String
,
Int
value
:
Int
Int
) ->
Unit
Unit
{
let
?
request
=
?
ctx
.
(Client, (Unit) -> Request) -> ?
locally
(
Client
client
,
Unit
_unwrapper
=> Request::
(String, Int) -> Request
Put
(
String
key
,
Int
value
))
async (ctx : ?, request : ?, strategy : async (?, ?) -> ?) -> ?
access_server_v3
(
?
ctx
,
?
request
,
async (?, ?) -> ?
strategy
) |>
(t : ?) -> Unit

Evaluates an expression and discards its result. This is useful when you want to execute an expression for its side effects but don't care about its return value, or when you want to explicitly indicate that a value is intentionally unused.

Parameters:

  • value : The value to be ignored. Can be of any type.

Example:

  let x = 42
  ignore(x) // Explicitly ignore the value
  let mut sum = 0
  ignore([1, 2, 3].iter().each((x) => { sum = sum + x })) // Ignore the Unit return value of each()
ignore
} async fn
async (ctx : ?, strategy : async (?, ?) -> ?, key : String) -> ?
get_v3
(
?
ctx
: @moonchor.ChoreoContext,
async (?, ?) -> ?
strategy
: ReplicationStrategy,
String
key
:
String
String
) -> @moonchor.Located[
enum Option[A] {
  None
  Some(A)
}
Response
,
struct Client {
}
Client
] {
let
?
request
=
?
ctx
.
(Client, (Unit) -> Request) -> ?
locally
(
Client
client
,
Unit
_unwrapper
=> Request::
(String) -> Request
Get
(
String
key
))
async (ctx : ?, request : ?, strategy : async (?, ?) -> ?) -> ?
access_server_v3
(
?
ctx
,
?
request
,
async (?, ?) -> ?
strategy
)
}

这样一来,复制策略被成功从处理请求的逻辑中抽象出来了。下面,我们重新实现一遍双副本的复制策略:

async fn 
async (state_at_server : ?, state_at_backup : ?) -> async (?, ?) -> ?
double_replication_strategy
(
?
state_at_server
: @moonchor.Located[
struct ServerState {
  db: Map[String, Int]
}
ServerState
,
struct Server {
}
Server
],
?
state_at_backup
: @moonchor.Located[
struct ServerState {
  db: Map[String, Int]
}
ServerState
,
struct Backup {
}
Backup
],
) -> ReplicationStrategy { fn(
?
ctx
: @moonchor.ChoreoContext,
?
request_at_server
: @moonchor.Located[
enum Request {
  Get(String)
  Put(String, Int)
}
Request
,
struct Server {
}
Server
]
) { let
Unit
request_at_backup
=
?
ctx
.
(Server, Backup, ?) -> Unit
comm
(
Server
server
,
Backup
backup
,
?
request_at_server
)
let
Unit
response_at_backup
=
?
ctx
.
(Backup, (Unit) -> Int?) -> Unit
locally
(
Backup
backup
, fn(
Unit
unwrapper
) {
let
Request
request
=
Unit
unwrapper
.
(Unit) -> Request
unwrap
(
Unit
request_at_backup
)
let
ServerState
state
=
Unit
unwrapper
.
(?) -> ServerState
unwrap
(
?
state_at_backup
)
(state : ServerState, request : Request) -> Int?
handle_request
(
ServerState
state
,
Request
request
)
}) let
Unit
backup_response
=
?
ctx
.
(Backup, Server, Unit) -> Unit
comm
(
Backup
backup
,
Server
server
,
Unit
response_at_backup
)
?
ctx
.
(Server, (Unit) -> Int?) -> ?
locally
(
Server
server
, fn(
Unit
unwrapper
) {
let
Request
request
=
Unit
unwrapper
.
(?) -> Request
unwrap
(
?
request_at_server
)
let
ServerState
state
=
Unit
unwrapper
.
(?) -> ServerState
unwrap
(
?
state_at_server
)
let
Int?
res
=
(state : ServerState, request : Request) -> Int?
handle_request
(
ServerState
state
,
Request
request
)
(responses : Array[Int?]) -> Unit
check_consistency
([
Unit
unwrapper
.
(Unit) -> Int?
unwrap
(
Unit
backup_response
),
Int?
res
])
Int?
res
}) } }

注意看 double_replication_strategy 的函数签名,它返回一个 ReplicationStrategy 类型的函数。只要提供两个参数,double_replication_strategy 就能构造出一个新的复制策略。至此,我们成功利用高阶函数抽象出了复制策略,这个特性在协同式编程中叫作高阶 choreography。

同样的,我们可以写一个简单的 choreography 来测试它:

async fn 
async (ctx : ?) -> Unit
kvstore_v3
(
?
ctx
: @moonchor.ChoreoContext) ->
Unit
Unit
{
let
?
state_at_server
=
?
ctx
.
(Server, (Unit) -> ServerState) -> ?
locally
(
Server
server
,
Unit
_unwrapper
=>
struct ServerState {
  db: Map[String, Int]
}
ServerState
::
() -> ServerState
new
())
let
?
state_at_backup
=
?
ctx
.
(Backup, (Unit) -> ServerState) -> ?
locally
(
Backup
backup
,
Unit
_unwrapper
=>
struct ServerState {
  db: Map[String, Int]
}
ServerState
::
() -> ServerState
new
())
let
async (?, ?) -> ?
strategy
=
async (state_at_server : ?, state_at_backup : ?) -> async (?, ?) -> ?
double_replication_strategy
(
?
state_at_server
,
?
state_at_backup
)
async (ctx : ?, strategy : async (?, ?) -> ?, key : String, value : Int) -> Unit
put_v3
(
?
ctx
,
async (?, ?) -> ?
strategy
, "key1", 42)
async (ctx : ?, strategy : async (?, ?) -> ?, key : String, value : Int) -> Unit
put_v3
(
?
ctx
,
async (?, ?) -> ?
strategy
, "key2", 41)
let
?
v1_at_client
=
async (ctx : ?, strategy : async (?, ?) -> ?, key : String) -> ?
get_v3
(
?
ctx
,
async (?, ?) -> ?
strategy
, "key1")
let
?
v2_at_client
=
async (ctx : ?, strategy : async (?, ?) -> ?, key : String) -> ?
get_v3
(
?
ctx
,
async (?, ?) -> ?
strategy
, "key2")
?
ctx
.
(Client, (Unit) -> Unit) -> Unit
locally
(
Client
client
, fn(
Unit
unwrapper
) {
let
Int
v1
=
Unit
unwrapper
.
(?) -> Unit
unwrap
(
?
v1_at_client
).
() -> Int
unwrap
()
let
Int
v2
=
Unit
unwrapper
.
(?) -> Unit
unwrap
(
?
v2_at_client
).
() -> Int
unwrap
()
if
Int
v1
(self : Int, other : Int) -> Int

Adds two 32-bit signed integers. Performs two's complement arithmetic, which means the operation will wrap around if the result exceeds the range of a 32-bit integer.

Parameters:

  • self : The first integer operand.
  • other : The second integer operand.

Returns a new integer that is the sum of the two operands. If the mathematical sum exceeds the range of a 32-bit integer (-2,147,483,648 to 2,147,483,647), the result wraps around according to two's complement rules.

Example:

  inspect(42 + 1, content="43")
  inspect(2147483647 + 1, content="-2147483648") // Overflow wraps around to minimum value
+
Int
v2
(self : Int, other : Int) -> Bool

Compares two integers for equality.

Parameters:

  • self : The first integer to compare.
  • other : The second integer to compare.

Returns true if both integers have the same value, false otherwise.

Example:

  inspect(42 == 42, content="true")
  inspect(42 == -42, content="false")
==
83 {
(input : String) -> Unit

Prints any value that implements the Show trait to the standard output, followed by a newline.

Parameters:

  • value : The value to be printed. Must implement the Show trait.

Example:

  println(42)
  println("Hello, World!")
  println([1, 2, 3])
println
("The server is working correctly")
} else {
() -> Unit
panic
()
} }) |>
(t : Unit) -> Unit

Evaluates an expression and discards its result. This is useful when you want to execute an expression for its side effects but don't care about its return value, or when you want to explicitly indicate that a value is intentionally unused.

Parameters:

  • value : The value to be ignored. Can be of any type.

Example:

  let x = 42
  ignore(x) // Explicitly ignore the value
  let mut sum = 0
  ignore([1, 2, 3].iter().each((x) => { sum = sum + x })) // Ignore the Unit return value of each()
ignore
} test "kvstore 3.0" { let
Unit
backend
=
(Array[Server]) -> Unit
@moonchor.make_local_backend
([
Server
server
,
Client
client
,
Backup
backup
])
(() -> Unit) -> Unit
@toolkit.run_async
(() =>
(Unit, async (?) -> Unit, Server) -> Unit
@moonchor.run_choreo
(
Unit
backend
,
async (ctx : ?) -> Unit
kvstore_v2
,
Server
server
))
(() -> Unit) -> Unit
@toolkit.run_async
(() =>
(Unit, async (?) -> Unit, Client) -> Unit
@moonchor.run_choreo
(
Unit
backend
,
async (ctx : ?) -> Unit
kvstore_v2
,
Client
client
))
(() -> Unit) -> Unit
@toolkit.run_async
(() =>
(Unit, async (?) -> Unit, Backup) -> Unit
@moonchor.run_choreo
(
Unit
backend
,
async (ctx : ?) -> Unit
kvstore_v2
,
Backup
backup
))
}

利用参数化多态实现角色多态

如果要进一步实现新的复制策略,例如三副本,我们需要定义两个新的 Backup 类型以做区分:

struct Backup1 {} derive(
trait Eq {
  op_equal(Self, Self) -> Bool
}

Trait for types whose elements can test for equality

Eq
,
trait Hash {
  hash_combine(Self, Hasher) -> Unit
  hash(Self) -> Int
}

Trait for types that can be hashed

Hash
,
trait Show {
  output(Self, &Logger) -> Unit
  to_string(Self) -> String
}

Trait for types that can be converted to String

Show
)
impl @moonchor.Location for
struct Backup1 {
}
Backup1
with
(_/0) -> String
name
(_) {
"backup1" } let
Backup1
backup1
:
struct Backup1 {
}
Backup1
=
struct Backup1 {
}
Backup1
::{ }
struct Backup2 {} derive(
trait Eq {
  op_equal(Self, Self) -> Bool
}

Trait for types whose elements can test for equality

Eq
,
trait Hash {
  hash_combine(Self, Hasher) -> Unit
  hash(Self) -> Int
}

Trait for types that can be hashed

Hash
,
trait Show {
  output(Self, &Logger) -> Unit
  to_string(Self) -> String
}

Trait for types that can be converted to String

Show
)
impl @moonchor.Location for
struct Backup2 {
}
Backup2
with
(_/0) -> String
name
(_) {
"backup2" } let
Backup2
backup2
:
struct Backup2 {
}
Backup2
=
struct Backup2 {
}
Backup2
::{ }

接下来需要修改 access_server 的核心逻辑。我们立刻发现了问题,为了让 Backup1 和 Backup2 都处理一遍请求并且得到响应,需要将以下几条语句重复:let request = unwrapper.unwrap(request_at_backup); let state = unwrapper.unwrap(state_at_backup); handle_request(state, request)。重复代码是坏味道,应当被抽象出来。此时,moonchor 的「角色作为类型」优势就体现出来了,我们可以利用 MoonBit 的参数化多态,将从副本处理逻辑抽象成一个多态函数 do_backup,它接收一个角色类型参数 B,表示从副本的角色:

async fn[B : @moonchor.Location] 
async (ctx : ?, request_at_server : ?, backup : B, state_at_backup : ?) -> ?
do_backup
(
?
ctx
: @moonchor.ChoreoContext,
?
request_at_server
: @moonchor.Located[
enum Request {
  Get(String)
  Put(String, Int)
}
Request
,
struct Server {
}
Server
],
B
backup
:

type parameter B

B
,
?
state_at_backup
: @moonchor.Located[
struct ServerState {
  db: Map[String, Int]
}
ServerState
,

type parameter B

B
]
) -> @moonchor.Located[
enum Option[A] {
  None
  Some(A)
}
Response
,
struct Server {
}
Server
] {
let
Unit
request_at_backup
=
?
ctx
.
(Server, B, ?) -> Unit
comm
(
Server
server
,
B
backup
,
?
request_at_server
)
let
Unit
response_at_backup
=
?
ctx
.
(B, (Unit) -> Int?) -> Unit
locally
(
B
backup
, fn(
Unit
unwrapper
) {
let
Request
request
=
Unit
unwrapper
.
(Unit) -> Request
unwrap
(
Unit
request_at_backup
)
let
ServerState
state
=
Unit
unwrapper
.
(?) -> ServerState
unwrap
(
?
state_at_backup
)
(state : ServerState, request : Request) -> Int?
handle_request
(
ServerState
state
,
Request
request
)
})
?
ctx
.
(B, Server, Unit) -> ?
comm
(
B
backup
,
Server
server
,
Unit
response_at_backup
)
}

如此一来,我们就能随心所欲地实现双副本或者三副本的复制策略了。对于三副本策略,只需在 triple_replication_strategy 返回的函数内调用 do_backup 两次即可:

async fn 
async (state_at_server : ?, state_at_backup1 : ?, state_at_backup2 : ?) -> async (?, ?) -> ?
triple_replication_strategy
(
?
state_at_server
: @moonchor.Located[
struct ServerState {
  db: Map[String, Int]
}
ServerState
,
struct Server {
}
Server
],
?
state_at_backup1
: @moonchor.Located[
struct ServerState {
  db: Map[String, Int]
}
ServerState
,
struct Backup1 {
}
Backup1
],
?
state_at_backup2
: @moonchor.Located[
struct ServerState {
  db: Map[String, Int]
}
ServerState
,
struct Backup2 {
}
Backup2
]
) -> ReplicationStrategy { fn(
?
ctx
: @moonchor.ChoreoContext,
?
request_at_server
: @moonchor.Located[
enum Request {
  Get(String)
  Put(String, Int)
}
Request
,
struct Server {
}
Server
]
) { let
?
backup_response1
=
async (ctx : ?, request_at_server : ?, backup : Backup1, state_at_backup : ?) -> ?
do_backup
(
?
ctx
,
?
request_at_server
,
Backup1
backup1
,
?
state_at_backup1
,
) let
?
backup_response2
=
async (ctx : ?, request_at_server : ?, backup : Backup2, state_at_backup : ?) -> ?
do_backup
(
?
ctx
,
?
request_at_server
,
Backup2
backup2
,
?
state_at_backup2
,
)
?
ctx
.
(Server, (Unit) -> Int?) -> ?
locally
(
Server
server
, fn(
Unit
unwrapper
) {
let
Request
request
=
Unit
unwrapper
.
(?) -> Request
unwrap
(
?
request_at_server
)
let
ServerState
state
=
Unit
unwrapper
.
(?) -> ServerState
unwrap
(
?
state_at_server
)
let
Int?
res
=
(state : ServerState, request : Request) -> Int?
handle_request
(
ServerState
state
,
Request
request
)
(responses : Array[Int?]) -> Unit
check_consistency
([
Unit
unwrapper
.
(?) -> Int?
unwrap
(
?
backup_response1
),
Unit
unwrapper
.
(?) -> Int?
unwrap
(
?
backup_response2
),
Int?
res
,
])
Int?
res
}) } }

由于我们成功完成了复制策略和访问过程的分离,access_serverputget 函数不需要任何修改。让我们对最终的 KVStore 进行测试:

async fn 
async (ctx : ?) -> Unit
kvstore_v4
(
?
ctx
: @moonchor.ChoreoContext) ->
Unit
Unit
{
let
?
state_at_server
=
?
ctx
.
(Server, (Unit) -> ServerState) -> ?
locally
(
Server
server
,
Unit
_unwrapper
=>
struct ServerState {
  db: Map[String, Int]
}
ServerState
::
() -> ServerState
new
())
let
?
state_at_backup1
=
?
ctx
.
(Backup1, (Unit) -> ServerState) -> ?
locally
(
Backup1
backup1
,
Unit
_unwrapper
=>
struct ServerState {
  db: Map[String, Int]
}
ServerState
::
() -> ServerState
new
())
let
?
state_at_backup2
=
?
ctx
.
(Backup2, (Unit) -> ServerState) -> ?
locally
(
Backup2
backup2
,
Unit
_unwrapper
=>
struct ServerState {
  db: Map[String, Int]
}
ServerState
::
() -> ServerState
new
())
let
async (?, ?) -> ?
strategy
=
async (state_at_server : ?, state_at_backup1 : ?, state_at_backup2 : ?) -> async (?, ?) -> ?
triple_replication_strategy
(
?
state_at_server
,
?
state_at_backup1
,
?
state_at_backup2
,
)
async (ctx : ?, strategy : async (?, ?) -> ?, key : String, value : Int) -> Unit
put_v3
(
?
ctx
,
async (?, ?) -> ?
strategy
, "key1", 42)
async (ctx : ?, strategy : async (?, ?) -> ?, key : String, value : Int) -> Unit
put_v3
(
?
ctx
,
async (?, ?) -> ?
strategy
, "key2", 41)
let
?
v1_at_client
=
async (ctx : ?, strategy : async (?, ?) -> ?, key : String) -> ?
get_v3
(
?
ctx
,
async (?, ?) -> ?
strategy
, "key1")
let
?
v2_at_client
=
async (ctx : ?, strategy : async (?, ?) -> ?, key : String) -> ?
get_v3
(
?
ctx
,
async (?, ?) -> ?
strategy
, "key2")
?
ctx
.
(Client, (Unit) -> Unit) -> Unit
locally
(
Client
client
, fn(
Unit
unwrapper
) {
let
Int
v1
=
Unit
unwrapper
.
(?) -> Unit
unwrap
(
?
v1_at_client
).
() -> Int
unwrap
()
let
Int
v2
=
Unit
unwrapper
.
(?) -> Unit
unwrap
(
?
v2_at_client
).
() -> Int
unwrap
()
if
Int
v1
(self : Int, other : Int) -> Int

Adds two 32-bit signed integers. Performs two's complement arithmetic, which means the operation will wrap around if the result exceeds the range of a 32-bit integer.

Parameters:

  • self : The first integer operand.
  • other : The second integer operand.

Returns a new integer that is the sum of the two operands. If the mathematical sum exceeds the range of a 32-bit integer (-2,147,483,648 to 2,147,483,647), the result wraps around according to two's complement rules.

Example:

  inspect(42 + 1, content="43")
  inspect(2147483647 + 1, content="-2147483648") // Overflow wraps around to minimum value
+
Int
v2
(self : Int, other : Int) -> Bool

Compares two integers for equality.

Parameters:

  • self : The first integer to compare.
  • other : The second integer to compare.

Returns true if both integers have the same value, false otherwise.

Example:

  inspect(42 == 42, content="true")
  inspect(42 == -42, content="false")
==
83 {
(input : String) -> Unit

Prints any value that implements the Show trait to the standard output, followed by a newline.

Parameters:

  • value : The value to be printed. Must implement the Show trait.

Example:

  println(42)
  println("Hello, World!")
  println([1, 2, 3])
println
("The server is working correctly")
} else {
() -> Unit
panic
()
} }) |>
(t : Unit) -> Unit

Evaluates an expression and discards its result. This is useful when you want to execute an expression for its side effects but don't care about its return value, or when you want to explicitly indicate that a value is intentionally unused.

Parameters:

  • value : The value to be ignored. Can be of any type.

Example:

  let x = 42
  ignore(x) // Explicitly ignore the value
  let mut sum = 0
  ignore([1, 2, 3].iter().each((x) => { sum = sum + x })) // Ignore the Unit return value of each()
ignore
} test "kvstore 4.0" { let
Unit
backend
=
(Array[Server]) -> Unit
@moonchor.make_local_backend
([
Server
server
,
Client
client
,
Backup1
backup1
,
Backup2
backup2
])
(() -> Unit) -> Unit
@toolkit.run_async
(() =>
(Unit, async (?) -> Unit, Server) -> Unit
@moonchor.run_choreo
(
Unit
backend
,
async (ctx : ?) -> Unit
kvstore_v4
,
Server
server
))
(() -> Unit) -> Unit
@toolkit.run_async
(() =>
(Unit, async (?) -> Unit, Client) -> Unit
@moonchor.run_choreo
(
Unit
backend
,
async (ctx : ?) -> Unit
kvstore_v4
,
Client
client
))
(() -> Unit) -> Unit
@toolkit.run_async
(() =>
(Unit, async (?) -> Unit, Backup1) -> Unit
@moonchor.run_choreo
(
Unit
backend
,
async (ctx : ?) -> Unit
kvstore_v4
,
Backup1
backup1
))
(() -> Unit) -> Unit
@toolkit.run_async
(() =>
(Unit, async (?) -> Unit, Backup2) -> Unit
@moonchor.run_choreo
(
Unit
backend
,
async (ctx : ?) -> Unit
kvstore_v4
,
Backup2
backup2
))
}

至此,我们完成了多副本 KVStore 的构建。在这个例子中,我们没有手动使用任何 sendrecv 来表达分布式节点间的交互,而是通过 moonchor 的协同式编程能力实现了所有通信和同步过程,避免可能的类型错误、死锁和显式同步问题。

结语

在这篇文章中,我们借助 moonchor 体验了协同式编程的魅力,还见识了 MoonBit 强大的表达能力。关于协同式编程的更多细节,可以参考 Haskell 的库 HasChorChoral 语言moonchor 的源码。想要自己尝试使用 moonchor,可以通过 moon add Milky2018/moonchor@0.15.0 命令安装。