Concurrency: Goroutines & Channel

Để hiểu sâu về Concurrency trong Go, cụ thể là GoroutinesChannels, tôi sẽ giải thích chi tiết về khái niệm, cách hoạt động, và cách sử dụng chúng một cách hiệu quả trong lập trình Go, với góc nhìn của một Senior Golang Engineer. Tôi sẽ trình bày rõ ràng, súc tích, và kèm theo các ví dụ thực tế.


1. Concurrency trong Go là gì?

Concurrency (đồng thời) là khả năng thực hiện nhiều tác vụ một cách độc lập, nhưng không nhất thiết phải song song (parallel). Go được thiết kế với concurrency là trọng tâm, giúp xử lý các tác vụ đồng thời một cách dễ dàng và hiệu quả. Hai công cụ chính hỗ trợ concurrency trong Go là:

  • Goroutines: Một đơn vị thực thi nhẹ (lightweight thread) được quản lý bởi runtime của Go.

  • Channels: Cơ chế giao tiếp an toàn giữa các goroutine để đồng bộ hóa và truyền dữ liệu.


2. Goroutines

Goroutine là gì?

Goroutine là một hàm hoặc phương thức được chạy đồng thời trong một không gian thực thi riêng biệt, được quản lý bởi Go runtime (không phải hệ điều hành). Goroutine rất nhẹ, chỉ chiếm vài KB bộ nhớ, cho phép bạn chạy hàng nghìn hoặc thậm chí hàng triệu goroutine mà không gây áp lực lớn lên hệ thống.

Cách tạo Goroutine

Sử dụng từ khóa go trước một hàm hoặc closure để khởi chạy một goroutine:

package main

import (
    "fmt"
    "time"
)

func printMessage(msg string) {
    for i := 0; i < 3; i++ {
        fmt.Println(msg, i)
        time.Sleep(100 * time.Millisecond) // Giả lập công việc tốn thời gian
    }
}

func main() {
    go printMessage("Goroutine 1") // Chạy trong goroutine
    printMessage("Main")          // Chạy trong main goroutine
    time.Sleep(1 * time.Second)   // Đợi để các goroutine hoàn thành
}

Giải thích:

  • go printMessage("Goroutine 1") khởi chạy hàm printMessage trong một goroutine mới.

  • main là một goroutine đặc biệt (main goroutine). Nếu main goroutine kết thúc, tất cả các goroutine khác cũng sẽ bị hủy.

  • time.Sleep được sử dụng để đảm bảo main goroutine không thoát trước khi các goroutine khác hoàn thành (trong thực tế, bạn nên dùng cơ chế đồng bộ hóa như sync.WaitGroup hoặc channels).

Đặc điểm của Goroutines

  1. Lightweight: Goroutine có kích thước stack ban đầu nhỏ (khoảng 2KB) và có thể mở rộng động.

  2. Quản lý bởi Go Runtime: Go sử dụng scheduler riêng để phân phối goroutines trên các thread của hệ điều hành, tối ưu hóa hiệu suất.

  3. Không cần quản lý thread thủ công: Go tự động xử lý việc lập lịch (scheduling) và phân bổ tài nguyên.

Lưu ý khi dùng Goroutines

  • Đồng bộ hóa: Goroutines chạy độc lập, vì vậy cần cơ chế đồng bộ hóa để tránh race condition hoặc đảm bảo thứ tự thực thi.

  • Race Condition: Khi nhiều goroutine truy cập và sửa đổi cùng một biến, cần sử dụng sync.Mutex hoặc channels để bảo vệ dữ liệu.

  • Main Goroutine: Nếu hàm main kết thúc, tất cả goroutines sẽ bị hủy, ngay cả khi chúng chưa hoàn thành.

Ví dụ với sync.WaitGroup

Dùng sync.WaitGroup để đồng bộ hóa các goroutine:

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // Giảm bộ đếm khi hoàn thành
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup
    for i := 1; i <= 3; i++ {
        wg.Add(1) // Tăng bộ đếm cho mỗi goroutine
        go worker(i, &wg)
    }
    wg.Wait() // Chờ tất cả goroutine hoàn thành
    fmt.Println("All workers done")
}

Giải thích:

  • sync.WaitGroup giúp main goroutine đợi tất cả các worker hoàn thành.

  • wg.Add(1) tăng bộ đếm, wg.Done() giảm bộ đếm, và wg.Wait() chặn cho đến khi bộ đếm về 0.


3. Channels

Channel là gì?

Channel là cơ chế giao tiếp an toàn giữa các goroutine, cho phép gửi và nhận dữ liệu. Channel hoạt động như một "ống dẫn" (pipe) để truyền dữ liệu giữa các goroutine mà không cần khóa (lock) hoặc cơ chế đồng bộ hóa phức tạp.

Các loại Channel

  1. Unbuffered Channel: Chỉ gửi dữ liệu khi cả sender và receiver đều sẵn sàng (synchronous).

  2. Buffered Channel: Có thể chứa một số lượng dữ liệu giới hạn, cho phép gửi dữ liệu mà không cần receiver ngay lập tức.

Cách tạo và sử dụng Channel

  • Tạo channel: ch := make(chan Type) (unbuffered) hoặc ch := make(chan Type, capacity) (buffered).

  • Gửi dữ liệu: ch <- value

  • Nhận dữ liệu: value := <-ch

  • Đóng channel: close(ch)

Ví dụ với Unbuffered Channel

package main

import (
    "fmt"
    "time"
)

func sender(ch chan string) {
    time.Sleep(time.Second)
    ch <- "Hello from sender!" // Gửi dữ liệu
}

func main() {
    ch := make(chan string) // Unbuffered channel
    go sender(ch)
    msg := <-ch // Nhận dữ liệu (chặn cho đến khi có dữ liệu)
    fmt.Println(msg)
}

Giải thích:

  • make(chan string) tạo một unbuffered channel.

  • sender gửi dữ liệu vào channel sau 1 giây.

  • main đợi nhận dữ liệu từ channel (chặn cho đến khi dữ liệu được gửi).

Ví dụ với Buffered Channel

package main

import "fmt"

func main() {
    ch := make(chan int, 2) // Buffered channel với capacity = 2
    ch <- 1                // Gửi 1
    ch <- 2                // Gửi 2
    fmt.Println(<-ch)      // Nhận: 1
    fmt.Println(<-ch)      // Nhận: 2
}

Giải thích:

  • Buffered channel cho phép gửi dữ liệu mà không cần receiver ngay lập tức, miễn là chưa vượt quá capacity.

  • Nếu gửi quá capacity mà không có receiver, sẽ gây deadlock.

Đóng Channel và Range

package main

import "fmt"

func producer(ch chan int) {
    for i := 1; i <= 3; i++ {
        ch <- i
    }
    close(ch) // Đóng channel khi hoàn thành
}

func main() {
    ch := make(chan int)
    go producer(ch)
    for v := range ch { // Lặp qua channel cho đến khi đóng
        fmt.Println(v)
    }
}

Giải thích:

  • close(ch) báo hiệu rằng không còn dữ liệu nào được gửi nữa.

  • for v := range ch lặp qua channel và dừng khi channel đóng.

Select Statement

Câu lệnh select cho phép xử lý nhiều channel cùng lúc, tương tự như switch nhưng dành cho channel:

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "from ch1"
    }()
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "from ch2"
    }()

    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println(msg1)
        case msg2 := <-ch2:
            fmt.Println(msg2)
        }
    }
}

Giải thích:

  • select chờ trên nhiều channel và xử lý channel nào sẵn sàng trước.

  • Nếu nhiều channel sẵn sàng, select chọn ngẫu nhiên một case.


4. Patterns sử dụng Goroutines và Channels

Worker Pool

Sử dụng nhiều goroutines để xử lý công việc từ một channel:

package main

import (
    "fmt"
    "sync"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        results <- job * 2 // Giả lập công việc
    }
}

func main() {
    jobs := make(chan int, 5)
    results := make(chan int, 5)
    var wg sync.WaitGroup

    // Khởi chạy 3 worker
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }

    // Gửi công việc
    for i := 1; i <= 5; i++ {
        jobs <- i
    }
    close(jobs)

    // Đợi các worker hoàn thành
    go func() {
        wg.Wait()
        close(results)
    }()

    // Nhận kết quả
    for result := range results {
        fmt.Println("Result:", result)
    }
}

Giải thích:

  • jobs là channel chứa các công việc.

  • results là channel chứa kết quả.

  • Mỗi worker lấy công việc từ jobs, xử lý, và gửi kết quả vào results.

Fan-out, Fan-in

  • Fan-out: Phân phối công việc từ một channel đến nhiều goroutine.

  • Fan-in: Thu thập kết quả từ nhiều channel vào một channel duy nhất.

package main

import (
    "fmt"
    "sync"
)

func producer(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func merge(cs ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    wg.Add(len(cs))

    for _, c := range cs {
        go func(c <-chan int) {
            for n := range c {
                out <- n
            }
            wg.Done()
        }(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

func main() {
    in := producer(1, 2, 3, 4)
    c1 := square(in)
    c2 := square(in) // Fan-out: hai pipeline xử lý cùng input
    for n := range merge(c1, c2) { // Fan-in: gộp kết quả
        fmt.Println(n)
    }
}

5. Lưu ý quan trọng

  1. Deadlock: Xảy ra khi tất cả goroutines bị chặn (ví dụ: gửi/nhận trên channel mà không có đối tác). Dùng select hoặc đảm bảo đóng channel đúng cách để tránh.

  2. Race Condition: Sử dụng go run -race để phát hiện race condition khi nhiều goroutine truy cập dữ liệu chung.

  3. Channel Direction: Sử dụng channel có hướng (chan<- cho gửi, <-chan cho nhận) để tăng tính an toàn và rõ ràng.

  4. Context: Dùng context.Context để hủy hoặc giới hạn thời gian chạy của goroutines trong các ứng dụng thực tế.


6. Khi nào dùng Goroutines và Channels?

  • Goroutines: Khi cần thực hiện nhiều tác vụ đồng thời (ví dụ: xử lý request HTTP, crawl web, xử lý batch job).

  • Channels: Khi cần giao tiếp hoặc đồng bộ hóa giữa các goroutine (ví dụ: truyền dữ liệu, điều phối worker pool).


7. Tài liệu tham khảo

  • Go Concurrency Patterns: Xem bài viết của Rob Pike hoặc tài liệu chính thức tại golang.org.

  • Effective Go: Hướng dẫn sử dụng goroutines và channels hiệu quả.

  • Go by Example: Các ví dụ thực tế về concurrency.

Last updated