Назад
Golang

Каналы в Go

Каналы в go очень помогают при проектировании конкурентного кода, давайте разберемся как с ними работать, начнем)

Что Под Капотом

Канал — это типобезопасная очередь для синхронизации goroutine’ов. Под капотом это структура с внутренним буфером, mutex’ами и условными переменными.

type hchan struct {
    qcount   uint           // кол-во элементов в очереди
    dataqsiz uint           // размер буфера
    buf      unsafe.Pointer // указатель на буфер
    elemsize uint16         // размер элемента
    closed   uint32         // флаг закрытия
    elemtype *_type         // тип элемента
    sendx    uint           // индекс отправки
    recvx    uint           // индекс приёма
    recvq    waitq          // очередь goroutine'ов, ждущих приёма
    sendq    waitq          // очередь goroutine'ов, ждущих отправки
    lock     mutex
}

При отправке в канал:

  1. Захватывается lock
  2. Если есть ждущий receiver — он просыпается, данные копируются напрямую
  3. Иначе данные пишутся в буфер (если есть место)
  4. Если буфер полон — goroutine паркуется в sendq

При приёме — всё наоборот.

Типы Каналов и Их Поведение

Небуферизованные (синхронные)

// Блокирует до момента, когда receiver готов
ch := make(chan int)

go func() {
    ch <- 42  // ждёт, пока кто-то прочитает
}()

fmt.Println(<-ch)  // 42

Когда использовать: Строгая синхронизация, handshake между goroutine’ами, гарантия выполнения.

Буферизованные (асинхронные)

ch := make(chan int, 3)

ch <- 1  // не блокирует
ch <- 2  // не блокирует
ch <- 3  // не блокирует
// ch <- 4  // panic: send on closed channel

fmt.Println(<-ch)  // 1

Подводный камень: Если горутина паникует после отправки, данные потеряются. Проверяй закрытие канала.

Directional Channels (только отправка/приём)

func producer(ch chan<- int) {
    ch <- 42
}

func consumer(ch <-chan int) {
    fmt.Println(<-ch)
}

func main() {
    ch := make(chan int)
    go producer(ch)
    consumer(ch)
}

Плюс: Компилятор гарантирует, что функция не закроет чужой канал.

Select: Многоплексирование

select {
case val := <-ch1:
    fmt.Println("from ch1:", val)
case val := <-ch2:
    fmt.Println("from ch2:", val)
case ch3 <- result:
    fmt.Println("sent to ch3")
case <-time.After(1 * time.Second):
    fmt.Println("timeout")
default:
    fmt.Println("no channel ready")
}

Как работает: Select ждёт, пока один из case’ов не будет готов. Если готовы несколько — выбирает случайно.

Реальная задача — работер с таймаутом:

func worker(jobs <-chan string, results chan<- string, ctx context.Context) {
    for {
        select {
        case job, ok := <-jobs:
            if !ok {
                return  // канал закрыт
            }
            results <- process(job)
        case <-ctx.Done():
            return  // контекст отменён
        }
    }
}

Паттерны

1. Worker Pool с Rate Limiting

type WorkerPool struct {
    jobs    chan Task
    results chan Result
    wg      sync.WaitGroup
}

type Task struct {
    ID   int
    Data string
}

type Result struct {
    TaskID int
    Output string
    Err    error
}

func NewWorkerPool(numWorkers, bufferSize int) *WorkerPool {
    return &WorkerPool{
        jobs:    make(chan Task, bufferSize),
        results: make(chan Result, bufferSize),
    }
}

func (wp *WorkerPool) Start(ctx context.Context, numWorkers int) {
    for i := 0; i < numWorkers; i++ {
        wp.wg.Add(1)
        go func(workerID int) {
            defer wp.wg.Done()
            for {
                select {
                case task, ok := <-wp.jobs:
                    if !ok {
                        return
                    }
                    // Симуляция работы
                    time.Sleep(time.Millisecond * 100)
                    wp.results <- Result{
                        TaskID: task.ID,
                        Output: fmt.Sprintf("Processed: %s", task.Data),
                    }
                case <-ctx.Done():
                    return
                }
            }
        }(i)
    }
}

func (wp *WorkerPool) Submit(task Task) {
    wp.jobs <- task
}

func (wp *WorkerPool) Close() {
    close(wp.jobs)
    wp.wg.Wait()
    close(wp.results)
}

func (wp *WorkerPool) Results() <-chan Result {
    return wp.results
}

// Использование
pool := NewWorkerPool(5, 100)
ctx := context.Background()
pool.Start(ctx, 5)

for i := 0; i < 20; i++ {
    pool.Submit(Task{ID: i, Data: fmt.Sprintf("task-%d", i)})
}

pool.Close()

for result := range pool.Results() {
    fmt.Printf("Task %d: %s\n", result.TaskID, result.Output)
}

2. Fan-Out / Fan-In (Pipeline)

// Fan-Out: распределение работы
func distribute(input <-chan int, numWorkers int) []<-chan int {
    channels := make([]<-chan int, numWorkers)
    for i := 0; i < numWorkers; i++ {
        ch := make(chan int, 10)
        channels[i] = ch
        
        go func(out chan<- int, id int) {
            for val := range input {
                if val%numWorkers == id {
                    out <- val * 2
                }
            }
            close(out)
        }(ch, i)
    }
    return channels
}

// Fan-In: сбор результатов
func merge(channels ...<-chan int) <-chan int {
    out := make(chan int, 10)
    var wg sync.WaitGroup
    
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for val := range c {
                out <- val
            }
        }(ch)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

// Использование
input := make(chan int, 100)
go func() {
    for i := 0; i < 100; i++ {
        input <- i
    }
    close(input)
}()

workers := distribute(input, 4)
results := merge(workers...)

for result := range results {
    fmt.Println(result)
}

3. Graceful Shutdown

type Server struct {
    requests chan *http.Request
    done     chan struct{}
    mu       sync.Mutex
    closed   bool
}

func NewServer() *Server {
    return &Server{
        requests: make(chan *http.Request, 100),
        done:     make(chan struct{}),
    }
}

func (s *Server) Start() {
    go func() {
        for {
            select {
            case req := <-s.requests:
                s.handleRequest(req)
            case <-s.done:
                return
            }
        }
    }()
}

func (s *Server) Submit(req *http.Request) error {
    s.mu.Lock()
    if s.closed {
        s.mu.Unlock()
        return fmt.Errorf("server closed")
    }
    s.mu.Unlock()
    
    select {
    case s.requests <- req:
        return nil
    case <-time.After(1 * time.Second):
        return fmt.Errorf("timeout")
    }
}

func (s *Server) Shutdown(ctx context.Context) error {
    s.mu.Lock()
    if s.closed {
        s.mu.Unlock()
        return nil
    }
    s.closed = true
    s.mu.Unlock()
    
    // Даём время на обработку текущих запросов
    select {
    case <-time.After(5 * time.Second):
        close(s.done)
    case <-ctx.Done():
        close(s.done)
        return ctx.Err()
    }
    return nil
}

func (s *Server) handleRequest(req *http.Request) {
    time.Sleep(100 * time.Millisecond)
    fmt.Printf("Processed: %s\n", req.URL)
}

Deadlock’и и Как их Избежать

Проблема: Циклическая зависимость

// ❌ DEADLOCK
ch1 := make(chan int)
ch2 := make(chan int)

go func() {
    <-ch1
    ch2 <- 1
}()

ch1 <- 0
<-ch2

Почему: main ждёт ch2, а goroutine ждёт ch1.

Решение: Используй буферизацию или контекст

// ✅ SAFE
ch1 := make(chan int, 1)
ch2 := make(chan int, 1)

ch1 <- 0
ch2 <- 1

<-ch1
<-ch2

Отладка

// Debug: посмотри статус канала через reflect
func chanStatus(ch interface{}) {
    v := reflect.ValueOf(ch)
    fmt.Printf("Cap: %d, Len: %d\n", v.Cap(), v.Len())
}

// Используй pprof для поиска утечек
import _ "net/http/pprof"

go func() {
    log.Println(http.ListenAndServe("localhost:6060", nil))
}()
// Открой http://localhost:6060/debug/pprof/goroutine

Итог

  • Небуферизованные - синхронизация
  • Буферизованные - decoupling с контролем нагрузки
  • Select - многоплексирование + graceful shutdown
  • Directional - compile-time safety

Главное: только sender закрывает канал, остальное управляй контекстом и sync.WaitGroup.