Каналы в Go
Каналы в go очень помогают при проектировании конкурентного кода, давайте разберемся как с ними работать, начнем)
Что Под Капотом
Канал — это типобезопасная очередь для синхронизации goroutine’ов. Под капотом это структура с внутренним буфером, mutex’ами и условными переменными.
type hchan struct {
qcount uint // кол-во элементов в очереди
dataqsiz uint // размер буфера
buf unsafe.Pointer // указатель на буфер
elemsize uint16 // размер элемента
closed uint32 // флаг закрытия
elemtype *_type // тип элемента
sendx uint // индекс отправки
recvx uint // индекс приёма
recvq waitq // очередь goroutine'ов, ждущих приёма
sendq waitq // очередь goroutine'ов, ждущих отправки
lock mutex
}
При отправке в канал:
- Захватывается lock
- Если есть ждущий receiver — он просыпается, данные копируются напрямую
- Иначе данные пишутся в буфер (если есть место)
- Если буфер полон — goroutine паркуется в sendq
При приёме — всё наоборот.
Типы Каналов и Их Поведение
Небуферизованные (синхронные)
// Блокирует до момента, когда receiver готов
ch := make(chan int)
go func() {
ch <- 42 // ждёт, пока кто-то прочитает
}()
fmt.Println(<-ch) // 42
Когда использовать: Строгая синхронизация, handshake между goroutine’ами, гарантия выполнения.
Буферизованные (асинхронные)
ch := make(chan int, 3)
ch <- 1 // не блокирует
ch <- 2 // не блокирует
ch <- 3 // не блокирует
// ch <- 4 // panic: send on closed channel
fmt.Println(<-ch) // 1
Подводный камень: Если горутина паникует после отправки, данные потеряются. Проверяй закрытие канала.
Directional Channels (только отправка/приём)
func producer(ch chan<- int) {
ch <- 42
}
func consumer(ch <-chan int) {
fmt.Println(<-ch)
}
func main() {
ch := make(chan int)
go producer(ch)
consumer(ch)
}
Плюс: Компилятор гарантирует, что функция не закроет чужой канал.
Select: Многоплексирование
select {
case val := <-ch1:
fmt.Println("from ch1:", val)
case val := <-ch2:
fmt.Println("from ch2:", val)
case ch3 <- result:
fmt.Println("sent to ch3")
case <-time.After(1 * time.Second):
fmt.Println("timeout")
default:
fmt.Println("no channel ready")
}
Как работает: Select ждёт, пока один из case’ов не будет готов. Если готовы несколько — выбирает случайно.
Реальная задача — работер с таймаутом:
func worker(jobs <-chan string, results chan<- string, ctx context.Context) {
for {
select {
case job, ok := <-jobs:
if !ok {
return // канал закрыт
}
results <- process(job)
case <-ctx.Done():
return // контекст отменён
}
}
}
Паттерны
1. Worker Pool с Rate Limiting
type WorkerPool struct {
jobs chan Task
results chan Result
wg sync.WaitGroup
}
type Task struct {
ID int
Data string
}
type Result struct {
TaskID int
Output string
Err error
}
func NewWorkerPool(numWorkers, bufferSize int) *WorkerPool {
return &WorkerPool{
jobs: make(chan Task, bufferSize),
results: make(chan Result, bufferSize),
}
}
func (wp *WorkerPool) Start(ctx context.Context, numWorkers int) {
for i := 0; i < numWorkers; i++ {
wp.wg.Add(1)
go func(workerID int) {
defer wp.wg.Done()
for {
select {
case task, ok := <-wp.jobs:
if !ok {
return
}
// Симуляция работы
time.Sleep(time.Millisecond * 100)
wp.results <- Result{
TaskID: task.ID,
Output: fmt.Sprintf("Processed: %s", task.Data),
}
case <-ctx.Done():
return
}
}
}(i)
}
}
func (wp *WorkerPool) Submit(task Task) {
wp.jobs <- task
}
func (wp *WorkerPool) Close() {
close(wp.jobs)
wp.wg.Wait()
close(wp.results)
}
func (wp *WorkerPool) Results() <-chan Result {
return wp.results
}
// Использование
pool := NewWorkerPool(5, 100)
ctx := context.Background()
pool.Start(ctx, 5)
for i := 0; i < 20; i++ {
pool.Submit(Task{ID: i, Data: fmt.Sprintf("task-%d", i)})
}
pool.Close()
for result := range pool.Results() {
fmt.Printf("Task %d: %s\n", result.TaskID, result.Output)
}
2. Fan-Out / Fan-In (Pipeline)
// Fan-Out: распределение работы
func distribute(input <-chan int, numWorkers int) []<-chan int {
channels := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
ch := make(chan int, 10)
channels[i] = ch
go func(out chan<- int, id int) {
for val := range input {
if val%numWorkers == id {
out <- val * 2
}
}
close(out)
}(ch, i)
}
return channels
}
// Fan-In: сбор результатов
func merge(channels ...<-chan int) <-chan int {
out := make(chan int, 10)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for val := range c {
out <- val
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// Использование
input := make(chan int, 100)
go func() {
for i := 0; i < 100; i++ {
input <- i
}
close(input)
}()
workers := distribute(input, 4)
results := merge(workers...)
for result := range results {
fmt.Println(result)
}
3. Graceful Shutdown
type Server struct {
requests chan *http.Request
done chan struct{}
mu sync.Mutex
closed bool
}
func NewServer() *Server {
return &Server{
requests: make(chan *http.Request, 100),
done: make(chan struct{}),
}
}
func (s *Server) Start() {
go func() {
for {
select {
case req := <-s.requests:
s.handleRequest(req)
case <-s.done:
return
}
}
}()
}
func (s *Server) Submit(req *http.Request) error {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return fmt.Errorf("server closed")
}
s.mu.Unlock()
select {
case s.requests <- req:
return nil
case <-time.After(1 * time.Second):
return fmt.Errorf("timeout")
}
}
func (s *Server) Shutdown(ctx context.Context) error {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return nil
}
s.closed = true
s.mu.Unlock()
// Даём время на обработку текущих запросов
select {
case <-time.After(5 * time.Second):
close(s.done)
case <-ctx.Done():
close(s.done)
return ctx.Err()
}
return nil
}
func (s *Server) handleRequest(req *http.Request) {
time.Sleep(100 * time.Millisecond)
fmt.Printf("Processed: %s\n", req.URL)
}
Deadlock’и и Как их Избежать
Проблема: Циклическая зависимость
// ❌ DEADLOCK
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
<-ch1
ch2 <- 1
}()
ch1 <- 0
<-ch2
Почему: main ждёт ch2, а goroutine ждёт ch1.
Решение: Используй буферизацию или контекст
// ✅ SAFE
ch1 := make(chan int, 1)
ch2 := make(chan int, 1)
ch1 <- 0
ch2 <- 1
<-ch1
<-ch2
Отладка
// Debug: посмотри статус канала через reflect
func chanStatus(ch interface{}) {
v := reflect.ValueOf(ch)
fmt.Printf("Cap: %d, Len: %d\n", v.Cap(), v.Len())
}
// Используй pprof для поиска утечек
import _ "net/http/pprof"
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
// Открой http://localhost:6060/debug/pprof/goroutine
Итог
- Небуферизованные - синхронизация
- Буферизованные - decoupling с контролем нагрузки
- Select - многоплексирование + graceful shutdown
- Directional - compile-time safety
Главное: только sender закрывает канал, остальное управляй контекстом и sync.WaitGroup.