読者です 読者をやめる 読者になる 読者になる

at kaneshin

Free space for me.

golang の channel を使って Dispatcher-Worker を作り goroutine 爆発させないようにする

golang で処理の高速化をするために goroutine/channel を使って並行処理にし、待ち時間を無駄にしないようにするのは言葉で表すのは簡単ですが、実際にパターンとして落としこむためには経験が必要だなと思うので、今回 Dispatcher-Worker として Job Queue を golang で実装する方法を紹介したいと思います。

この記事は mattn さんの Big Sky :: golang の channel を使ったテクニックあれこれ の次のステップとして読むことをオススメします。

mattn.kaoriya.net

golang で作成したアプリケーションで多くのリクエストをアプリケーションが送受信する必要がある場合、高速に捌くために並行処理にして非同期化を図る場合を想定しています。

今回は get という関数でHTTPリクエストを実行して取得したデータのサイズとそのときの goroutine の数を出力するようにしています。

func get(url string) {
    resp, err := http.DefaultClient.Get(url)
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Goroutine:%d, URL:%s (%d bytes)", runtime.NumGoroutine(), url, len(body))
}

非同期処理への単純アプローチ

TL;DR https://gist.github.com/kaneshin/c589b958592b4b685accc48249bf4b41

「並行処理するためには go をつければいいんですよね?」という感覚で下記のように実装してしまうと、大規模アプリケーションでは即座に破綻します。 goroutine の数が処理の数に応じて上昇するためです。

func main() {
    var wg sync.WaitGroup

    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            get(fmt.Sprintf("http://placehold.it/%dx%d", i, i))
        }(i)
    }
    wg.Wait()
}

これのログを見てみると、for文のカウンターに依存して goroutine の数が増えているのがわかります。

...
2016/08/18 17:05:27 Goroutine:104, URL:http://placehold.it/31x31 (2843 bytes)
2016/08/18 17:05:27 Goroutine:101, URL:http://placehold.it/32x32 (2901 bytes)
2016/08/18 17:05:27 Goroutine:98, URL:http://placehold.it/17x17 (2879 bytes) 
2016/08/18 17:05:27 Goroutine:95, URL:http://placehold.it/83x83 (3495 bytes) 
2016/08/18 17:05:27 Goroutine:92, URL:http://placehold.it/57x57 (3197 bytes) 
2016/08/18 17:05:27 Goroutine:89, URL:http://placehold.it/25x25 (2923 bytes) 
2016/08/18 17:05:27 Goroutine:86, URL:http://placehold.it/95x95 (3586 bytes) 
...

今回の Dispatcher-Worker ではこの goroutine を worker の数で制限できるような方針です。

Dispatcher-Worker でのアプローチ

TL;DR: https://gist.github.com/kaneshin/69bd13c7b57ba8bac84fb4de0098b5fc

Dispatcher-Worker のイメージは下記の通りで、dispatcher の方で Idle 状態の worker にキューイングされているメッセージを渡します。

f:id:laplus-knsn:20160818181940p:plain

Dispatcher と Worker

dispatcher ではメッセージのキューイングと Idle 状態の worker を管理できるようにします。

type (
    // Dispatcher represents a management workers.
    Dispatcher struct {
        pool    chan *worker     // Idle 状態の worker の受け入れ先
        queue   chan interface{} // メッセージの受け入れ先
        workers []*worker
        wg      sync.WaitGroup   // 非同期処理の待機用
        quit    chan struct{}
    }
)

worker は処理するメッセージを受け取れるようにします。

type (
    // worker represents the worker that executes the job.
    worker struct {
        dispatcher *Dispatcher
        data       chan interface{} // 受け取ったメッセージの受信先
        quit       chan struct{}
    }
)

Dispatcher と Worker の起動

Dispatcher と Worker は起動させてからはメッセージを受け取り次第勝手に処理を実行するようになります。非同期処理の終了を待機できるようにしておくように sync.WaitGroup もうまくハンドリングします。

Dispatcher はメッセージがキューイングされたとき、 Idle になっている worker にメッセージを送信します。

// Start starts the specified dispatcher but does not wait for it to complete.
func (d *Dispatcher) Start() {
    for _, w := range d.workers {
        w.start()
    }

    go func() {
        for {
            select {
            // メッセージがキューイングされた場合、 v にメッセージを設定
            case v := <-d.queue:
                (<-d.pool).data <- v // 下記の2行と同じ意味
                // worker := <-d.pool // d.pool から Idle の worker がpoolingされるまで待機
                // worker.data <- v // worker.data に メッセージ v を送信

            case <-d.quit:
                return
            }
        }
    }()
}

worker は自身が Idle になり次第自身をプーリングすることと、メッセージを受信したら処理を実施します。

func (w *worker) start() {
    go func() {
        for {
            // dispatcher の pool に自身を送信する(Idle状態を示す)
            w.dispatcher.pool <- w

            select {
            // メッセージがキューイングされた場合、 v にメッセージを設定
            case v := <-w.data:
                if str, ok := v.(string); ok {
                    // get 関数でHTTPリクエスト
                    get(str)
                }

                // WaitGroupのカウントダウン
                w.dispatcher.wg.Done()

            case <-w.quit:
                return
            }
        }
    }()
}

メッセージのキューイング

外からのメッセージをキューイングするのは Dispatcher だけですので、 Dispatcher にエンキュー用の関数を用意します。

// Add adds a given value to the queue of the dispatcher.
func (d *Dispatcher) Add(v interface{}) {
    // キューイングされた場合に処理を待機するために WaitGroup をカウントアップ
    d.wg.Add(1)
    d.queue <- v
}

Dispatcher の待機

非同期処理が終わる前に main の実行を終了してしまった場合、キューイングしたメッセージが処理されないで終了してしまいます。そうならないように Wait 関数を用意します。

// Wait waits for the dispatcher to exit. It must have been started by Start.
func (d *Dispatcher) Wait() {
    d.wg.Wait()
}

単純に sync.WaitGroup の Wait 関数で待機するだけです。

Dispatcher と Worker の初期化と実行

ここまで実装が完了すれば後は初期化を行い、実行するのみです。

const (
    maxWorkers = 3
    maxQueues  = 10000
)

// NewDispatcher returns a pointer of Dispatcher.
func NewDispatcher() *Dispatcher {
    // dispatcher の初期化
    d := &Dispatcher{
        pool:  make(chan *worker, maxWorkers),    // capacity は用意する worker の数
        queue: make(chan interface{}, maxQueues), // capacity はメッセージをキューイングする数
        quit:  make(chan struct{}),
    }

    // worker の初期化
    d.workers = make([]*worker, cap(d.pool))
    for i := 0; i < cap(d.pool); i++ {
        w := worker{
            dispatcher: d,
            data:       make(chan interface{}), // worker でキューイングする場合は capacity を2以上
            quit:       make(chan struct{}),
        }
        d.workers[i] = &w
    }
    return d
}

func main() {
    d := NewDispatcher()

    d.Start()
    for i := 0; i < 100; i++ {
        url := fmt.Sprintf("http://placehold.it/%dx%d", i, i)
        d.Add(url)
    }
    d.Wait()
}

これを実行してみると、 goroutine の数が for文のカウンターではなく、 worker の数に依存しているのがわかります。

...
2016/08/18 18:54:06 Goroutine:22, URL:http://placehold.it/77x77 (3321 bytes)   
2016/08/18 18:54:06 Goroutine:22, URL:http://placehold.it/78x78 (3431 bytes)   
2016/08/18 18:54:06 Goroutine:20, URL:http://placehold.it/79x79 (3427 bytes)   
2016/08/18 18:54:06 Goroutine:16, URL:http://placehold.it/81x81 (3296 bytes)   
2016/08/18 18:54:06 Goroutine:16, URL:http://placehold.it/80x80 (3478 bytes)   
...

コード全体はこちらにあります。

今回の worker はシンプルに実装しているので、worker 側でキューイングすることはしていないですが、延長で実装が可能なので是非試してみて下さい。

おわりに

mattn さんも Big Sky :: golang の channel を使ったテクニックあれこれ の中で話していますが、 goroutine と channel の可能性はかなりあると思っており、今回の一例も単なるベースに過ぎずこれ以上の実装はまだまだ可能です。慣れないと使い処に困る goroutine と channel かもしれませんが、使いこなすことが出来れば実装の視野も広がると思います。是非、この夏に goroutine/channel を使いこなせるようになりましょう。