首页 » Go » 正文

Go 语言集训(四):并发

并发编程一般采用共享内存系统和消息传递系统。Go语言采用消息传递系统。即“不要通过共享内存来通信,而应该通过通信来共享内存

Channel

Channel 是Go语言提供的goroutine间的通信方式。可以使用channel在两个或者多个goroutine之间传递消息。channel是进程内的通信方式,因此通过channel传递对象的过程和调用函数时的参数传递行为比较一致,比如也可以传递指针。如果需要跨进程通信,我们建议用分布式系统的方法来解决,比如使用Socket或者HTTP等通信协议。

Channel是类型相关的。也就是说,一个channel智能传递一种类型的值,这个烈性需要在声明channel时制定。可以将其认为是一种类型安全的管道。

select

通过调用select()函数来监控一系列的文件句柄,一旦其中一个文件句柄发生了IO动作,该select()调用就会被返回。GO语言在语言级别支持select关键字,用于处理异步IO问题。
select与switch非常类似,由select开始一个新的选择快,每个选择条件由case语句来描述。与switch语句可以选择任何可使用相等比较的条件相比,select有比较多的限制,其中最大的一条限制是每个case语句里必须是一个IO操作。

select {
    case <- chan1:
    //如果chanl成功读到数据,则进行该case语句
    case chan2 <- 1:
    //如果成功向chan2写入数据,则进行case处理
    default:
    //如果上面都没有成功,则进入default处理。
}

缓冲机制

之前都是传递单个数据场景,如果持续传输大量数据的场景,就有些问题。需要给channel带上缓冲,从而达到消息队列的效果。
创建一个带缓冲的channel:
c := make(chan int, 1024)
调用make()时,缓冲区大小作为第二个参数传入即可。即使没有读取方,写入方也可以一直往channel里写入,在缓冲区被填写完之前,都不会被阻塞。

超时机制

如果发现channle已满,或者从channel读取数据时发现channel为空。如果不正确处理这些情况,很容易导致goroutine锁死。 Go语言没有提供直接的超时处理机制,但我们可以利用select机制。虽然select不是专为超时而设计的,却能很方便解决超时问题。因为select特点是只要其中一个case已经完成,程序会继续往下执行,而不会考虑其他case情况。

timeout := make(chan bool, 1)
go func() {
    time.Sleep(1e9)
    timeout <- true
}()
select {
    case <- ch:
    //从ch中读取到了数据
    case <- timeout:
    //一直没有从ch读取到数据,但从timeout读取到了数据
}

channel的传递

管道是一种非常广泛的一种设计模式,比如在处理数据时,我们采用管道设计,这样可以比较容易以插件的方式增加数据的处理流。

type PipeData struct {
    value int
    handler func(int) int
    next chan int
}

func handle(queue chan *PipeData) {
    for data := range queue {
        data.next <- data.handler(data.value)
    }
}

单向channel

单向channel 智能用于发送或者接受数据。channel本身必然是同时支持读写的,否则根本无法使用。加入一个channle真的只能读,那么只能为空,所以这是没有意义的。单向channle,其实只是对channel的一种使用限制。

我们在讲一个chanel变量传递到一个函数时,可以通过其指定为单向channel变量,从而限制该函数对此channel的操作。比如只能往这个channle写,或者读。

单向channel声明很简单。

var ch1 chan int //ch1是一个正常的chan,不是单向的
var ch2 chan<- float64 //ch2是一个单向channle,只能用于写float64数据
var ch3 <- chan int //ch3是一个只能读的channle,只能读int数据。

//初始化
ch4 := make(chan int)
ch5 := <-chan int(ch4) //ch5是一个单向读取的
ch6 := cha <- int(ch4) //ch6是一个单向写的

close(ch) //关闭channel

多核并行化

在执行一些昂贵的计算任务时,我们希望能够尽量利用现代服务器普遍具备的多核特性来尽量将任务并行化,从而降低总计算时间的目的。此时我们需要了解CPU核心的数量,并针对性地分解计算任务到多个goroutine中去并行运行。


type Vector []float64 func (v Vector) DoSome(i, n int, u Vector, c chan int) { for ; i < n; i++ { v[i] += u.Op(v[i]) } c <- 1 //任务执行完毕 } const NCPU = 16 func (v Vector) DoAll(u Vector) { c := make(chan int, NCPU) //用于接收每个CPU的任务完成信号 for i := 0; i < NCPU; i++ { go v.DoSome(i*len(v)/NCPU, (i+1)*len(v)/NCPU, u, c) } for i := 0; i < NCPU; i++ { <-c //获取每一个数据,表示一个CPU计算完成 } } //并没有发现有明显的加快,可以采用设置GOMAXPROCS的参数,设置并发。

同步

GO语言中的sync采用两种类型的锁。sync.Mutex 和 sync.RWMutex. Mutex是最简单的的锁,goroutine获得后,其他只能等待。RWMutex相对友好,是经典的单写多读模型。从实现上来看,RWMutex类型组合了Mutex。

type RWMutex struct {
    w Mutex
    writeSem unit32
    readSem unit32
    readCount int32
    readerWait int32
}

全局唯一性操作

对于从全局角度值需要运行一次代码,比如全局初始化操作,go语言提供了一个Once类型保证全局的唯一性操作。具体代码如下

var a string
var once sync.Once

func setup() {
    a = "hello world"
}

func doprint() {
    once.Do(setup)
    print(a)
}
func twopoint() {
    go doprint()
    go doprint()
}
// 没有引入once,则setup将会被每个goroutine先调用一次,至少对于这个例子是多余的。 once的Do方法可以保证全局范围内之只调用制定的函数次,而且所有其他goroutine在调用此举是,将会被阻塞。直到全剧唯一的once.Do()调用结束后才继续

案例

src
-cg:center.go centerclient.go player.go
-ipc:client.go server.og ipc_test.og
-cgss.go

cgss.go

package main

import (
    "bufio"
    "cg"
    "fmt"
    "ipc"
    "os"
    "strconv"
    "strings"
)

var centerClient *cg.CenterClient

func startCenterServer() error {
    server := ipc.NewIpcServer(&cg.CenterServer{})
    client := ipc.NewIpcClient(server)
    centerClient = &cg.CenterClient{client}
    return nil
}

func Help(args []string) int {
    fmt.Println(`
    Command:
    login <user><leve><exp>
    logout <user>
    send<mess>
    listplayer
    quit(q)
    help(h)
    `)
    return 0
}

func Quit(args []string) int {
    return 1
}

func Logout(args []string) int {
    if len(args) != 2 {
        fmt.Println("Usage : logout <username>")
        return 0
    }
    centerClient.RemovePlayer(args[1])
    return 0
}

func Login(args []string) int {
    if len(args) != 4 {
        fmt.Println("Usage:login...")
        return 0
    }
    level, err := strconv.Atoi(args[2])

    if err != nil {
        return 0
    }
    exp, err := strconv.Atoi(args[3])
    if err != nil {
        return 0
    }
    player := cg.NewPlayer()
    player.Name = args[1]
    player.Level = level
    player.Exp = exp
    err = centerClient.AddPlayer(player)
    return 0
}

func ListPlayer(args []string) int {
    ps, err := centerClient.ListPlayer("")
    if err != nil {
        fmt.Println("Failed", err)
    } else {
        for i, v := range ps {
            fmt.Println(i+1, ":", v)
        }
    }
    return 0
}

func Send(args []string) int {
    message := strings.Join(args[1:], " ")
    err := centerClient.Broadcast(message)
    if err != nil {
        fmt.Println("Failed", err)
    }
    return 0
}

func GetCommandHandlers() map[string]func(args []string) int {
    return map[string]func([]string) int{
        "help":       Help,
        "h":          Help,
        "login":      Login,
        "logout":     Logout,
        "listplayer": ListPlayer,
        "send":       Send,
        "quit":       Quit,
    }
}

func main() {
    fmt.Println("Game server soltuion")
    startCenterServer()
    Help(nil)
    r := bufio.NewReader(os.Stdin)

    handlers := GetCommandHandlers()

    for {

        fmt.Println("Command >")
        b, _, _ := r.ReadLine()
        line := string(b)
        tokens := strings.Split(line, " ")

        if handlers, ok := handlers[tokens[0]]; ok {
            ret := handlers(tokens)
            if ret != 0 {
                break
            }
        } else {
            fmt.Println("Unknow command", tokens[0])
        }
    }
}

center.go

package cg

import (
    "encoding/json"
    "errors"
    "fmt"
    "ipc"
    "sync"
)

var _ ipc.Server = &CenterServer{}

type Message struct {
    From    string "from"
    To      string "to"
    Content string "content"
}

type CenterServer struct {
    servers map[string]ipc.Server
    players []*Player
    //  rooms   []*Room
    mutex sync.RWMutex
}

func NewCenterServer() *CenterServer {
    servers := make(map[string]ipc.Server)
    players := make([]*Player, 0)
    return &CenterServer{servers: servers, players: players}
}

func (server *CenterServer) addPlayer(params string) error {
    player := NewPlayer()

    err := json.Unmarshal([]byte(params), &player)
    if err != nil {
        return err
    }
    server.mutex.Lock()
    defer server.mutex.Unlock()

    server.players = append(server.players, player)
    return nil
}

func (server *CenterServer) removePlayer(params string) error {
    server.mutex.Lock()
    defer server.mutex.Unlock()

    for i, v := range server.players {
        if v.Name == params {
            fmt.Println("dddd", len(server.players))
            if len(server.players) == 1 {
                server.players = make([]*Player, 0)
                fmt.Println("1111")
            } else if i == len(server.players)-1 {
                server.players = server.players[:i-1]
            } else if i == 0 {
                server.players = server.players[1:]
            } else {
                server.players = append(server.players[:i-1], server.players[:i+1]...)
            }
            return nil
        }
    }
    return errors.New("Plyaer not found")
}

func (server *CenterServer) listPlayer(params string) (players string, err error) {
    server.mutex.Lock()
    defer server.mutex.Unlock()

    if len(server.players) > 0 {
        b, _ := json.Marshal(server.players)
        players = string(b)
    } else {
        err = errors.New("no player online")
    }
    return
}

func (server *CenterServer) broadcast(prams string) error {
    var message Message
    err := json.Unmarshal([]byte(prams), &message)
    if err != nil {
        return err
    }

    server.mutex.Lock()
    defer server.mutex.Unlock()

    if len(server.players) > 0 {
        for _, player := range server.players {
            player.mq <- &message
        }
    } else {
        err = errors.New("No player online")
    }
    return err
}

func (server *CenterServer) Handle(method, params string) *ipc.Response {
    switch method {
    case "addplayer":
        err := server.addPlayer(params)
        if err != nil {
            return &ipc.Response{Code: err.Error()}
        }
    case "removeplayer":
        err := server.removePlayer(params)
        if err != nil {
            return &ipc.Response{Code: err.Error()}
        }
    case "listplayer":
        players, err := server.listPlayer(params)
        if err != nil {
            return &ipc.Response{Code: err.Error()}
        }
        return &ipc.Response{"200", players}
    case "broadcast":
        err := server.broadcast(params)
        if err != nil {
            return &ipc.Response{Code: err.Error()}
        }
        return &ipc.Response{Code: "200"}
    default:
        return &ipc.Response{Code: "404", Body: method + ":" + params}
    }
    return &ipc.Response{Code: "200"}
}

// Name ..
func (server *CenterServer) Name() string {
    return "CenterServer"
}

player.go

package cg

import "fmt"

type Player struct {
    Name  string "name"
    Level int    "level"
    Exp   int    "exp"
    //  Room  int    "rom"
    mq chan *Message
}

func NewPlayer() *Player {
    m := make(chan *Message, 1024)
    player := &Player{"", 0, 0, m}
    go func(p *Player) {
        for {
            msg := <-p.mq
            fmt.Println(p.Name, "received message:", msg.Content)
        }
    }(player)
    return player
}

centerclient.go

package cg

import (
    "encoding/json"
    "errors"
    "ipc"
)

type CenterClient struct {
    *ipc.IpcClient
}

func (client *CenterClient) AddPlayer(player *Player) error {
    b, err := json.Marshal(*player)
    if err != nil {
        return err
    }

    resp, err := client.Call("addplayer", string(b))
    if err == nil && resp.Code == "200" {
        return nil
    }
    return err
}

func (client *CenterClient) RemovePlayer(name string) error {
    ret, _ := client.Call("removeplayer", name)
    if ret.Code == "200" {
        return nil
    }
    return errors.New(ret.Code)
}

func (client *CenterClient) ListPlayer(params string) (ps []*Player, err error) {
    resp, _ := client.Call("listplayer", params)
    if resp.Code != "200" {
        err = errors.New(resp.Code)
        return
    }
    err = json.Unmarshal([]byte(resp.Body), &ps)
    return
}

func (client *CenterClient) Broadcast(message string) error {
    m := &Message{Content: message} //构造message结构体
    b, err := json.Marshal(m)
    if err != nil {
        return nil
    }

    resp, _ := client.Call("broadcast", string(b))
    if resp.Code == "200" {
        return nil
    }
    return errors.New(resp.Code)
}

ipc:client.go

package ipc

import "encoding/json"

type IpcClient struct {
    conn chan string
}

func NewIpcClient(server *IpcServer) *IpcClient {
    c := server.Connect()
    return &IpcClient{c}
}

func (client *IpcClient) Call(method, params string) (resp *Response, err error) {
    req := &Request{method, params}

    var b []byte
    b, err = json.Marshal(req)
    if err != nil {
        return
    }

    client.conn <- string(b)
    str := <-client.conn

    var resp1 Response
    err = json.Unmarshal([]byte(str), &resp1)
    resp = &resp1

    return
}

func (client *IpcClient) Close() {
    client.conn <- "CLOSE"
}

server.og

package ipc

import (
    "encoding/json"
    "fmt"
)

type Request struct {
    Method string "method"
    Params string "params"
}

type Response struct {
    Code string "code"
    Body string "body"
}

type Server interface {
    Name() string
    Handle(method, params string) *Response
}

type IpcServer struct {
    Server
}

func NewIpcServer(server Server) *IpcServer {
    return &IpcServer{server}
}

func (server *IpcServer) Connect() chan string {
    session := make(chan string, 0)
    go func(c chan string) {
        for {
            request := <-c
            if request == "CLOSE" {
                break
            }
            var req Request
            err := json.Unmarshal([]byte(request), &req)
            if err != nil {
                fmt.Println("Invalid request format:", request)
            }

            resp := server.Handle(req.Method, req.Params)
            b, err := json.Marshal(resp)
            c <- string(b)
        }
        fmt.Println("Session closed")
    }(session)
    fmt.Println("A new session has been created successfully")
    return session
}