Initial commit
This commit is contained in:
0
hw05_parallel_execution/.sync
Normal file
0
hw05_parallel_execution/.sync
Normal file
84
hw05_parallel_execution/README.md
Normal file
84
hw05_parallel_execution/README.md
Normal file
@@ -0,0 +1,84 @@
|
||||
## Домашнее задание №5 «Параллельное исполнение»
|
||||
Необходимо написать функцию для параллельного выполнения заданий в n параллельных горутинах:
|
||||
* количество создаваемых горутин не должно зависеть от числа заданий, т.е. функция должна запускать n горутин для конкурентной обработки заданий и, возможно, еще несколько вспомогательных горутин;
|
||||
* функция должна останавливать свою работу, если произошло m ошибок;
|
||||
* после завершения работы функции (успешного или из-за превышения m) не должно оставаться работающих горутин.
|
||||
|
||||
Нужно учесть, что задания могут выполняться разное время, а длина списка задач
|
||||
`len(tasks)` может быть больше или меньше n.
|
||||
|
||||
Значение m <= 0 трактуется на усмотрение программиста:
|
||||
- или это знак игнорировать ошибки в принципе;
|
||||
- или считать это как "максимум 0 ошибок", значит функция всегда будет возвращать
|
||||
`ErrErrorsLimitExceeded`;
|
||||
- на эту логику следует написать юнит-тест.
|
||||
|
||||
Граничные случаи:
|
||||
* если задачи работают без ошибок, то выполнятся `len(tasks)` задач, т.е. все задачи;
|
||||
* если в первых выполненных m задачах (или вообще всех) происходят ошибки, то всего выполнится не более n+m задач.
|
||||
|
||||
|
||||
**(*) Дополнительное задание: написать тест на concurrency без time.Sleep**
|
||||
|
||||
Придумайте тест, который проверит concurrency другим способом.
|
||||
|
||||
Текущий тест "tasks without errors" использует time.Sleep и подсчет времени выполнения, чтобы сделать вывод о конкурентности использования. Проблема тестов на слипчиках в том, что на CI часто не хватает CPU и подобные тесты работают нестабильно.
|
||||
|
||||
Подсказка: используйте `require.Eventually`.
|
||||
|
||||
---
|
||||
#### Пример
|
||||
Имеем 10 задач, n=4 воркера, m=2 ошибки.
|
||||
- Запускаем:
|
||||
```
|
||||
--------------ok (узнал, что лимит превышен и остановился)
|
||||
-----------err
|
||||
-------err
|
||||
--------------------ok
|
||||
```
|
||||
Выполнится 4 задачи (2 успешно) <= 4 + 2, остальные задачи не берем.
|
||||
|
||||
- Другая ситуация, работающие воркеры успели еще взять задач:
|
||||
```
|
||||
------ok--------ok (узнал, что лимит превышен и остановился)
|
||||
-----------err
|
||||
---err
|
||||
--------ok-------ok
|
||||
```
|
||||
Выполнится 6 задач (4 успешно) <= 4 + 2, остальные задачи не берем.
|
||||
|
||||
- Ошибок не было:
|
||||
```
|
||||
-------ok-----ok-----ok-----ok (1 воркер выполнил 4 задачи)
|
||||
-----------ok-------------ok (2 воркер выполнил 2 задачи)
|
||||
-----ok---------ok---------ok (3 воркер выполнил 3 задачи)
|
||||
--------------------ok (4 воркер выполнил 1 задачу)
|
||||
```
|
||||
Выполнится 10 задач (10 успешно): задач не осталось, воркеры остановились.
|
||||
|
||||
---
|
||||
|
||||
При необходимости можно выделять дополнительные функции / ошибки.
|
||||
|
||||
### Критерии оценки
|
||||
- Пайплайн зелёный - 4 балла
|
||||
- Добавлены новые юнит-тесты - до 4 баллов
|
||||
- Понятность и чистота кода - до 2 баллов
|
||||
|
||||
#### Зачёт от 7 баллов
|
||||
|
||||
### Подсказки
|
||||
- https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem
|
||||
- `sync.WaitGroup`
|
||||
- `go test -v -race -count=100 .`
|
||||
|
||||
### Частые ошибки
|
||||
1) `racedetector` ругается на строчку с ассертом в тестах:
|
||||
- простой случай: после выхода из `Run` остаются висячие горутины, отсюда и получаем `data race` -
|
||||
ассерт в тестах неатомарно обращается к `runTasksCount`, в то время как зомби-горутины атомарно пытаюся её поменять.
|
||||
- случай посложнее: один тест завершается успешно, но висячие горутины, им порожденные, аффектят ассерты в
|
||||
последующих тестах.
|
||||
2) Запускаются лишние горутины (инструкции `go`). Их количество за все время работы `Run` должно быть n (плюс, возможно, еще одна-две, если по другому не получается). В некоторых решениях ошибочно контроллируется количество одновременно работающих, а не общее количество.
|
||||
|
||||
**Решение**: внимательно посмотреть места выхода из функции и гарантировать, что все порождённые вами горутины
|
||||
завершились к этому моменту.
|
||||
16
hw05_parallel_execution/go.mod
Normal file
16
hw05_parallel_execution/go.mod
Normal file
@@ -0,0 +1,16 @@
|
||||
module github.com/fixme_my_friend/hw05_parallel_execution
|
||||
|
||||
go 1.19
|
||||
|
||||
require (
|
||||
github.com/stretchr/testify v1.7.0
|
||||
go.uber.org/goleak v1.1.10
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
|
||||
golang.org/x/tools v0.1.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
|
||||
)
|
||||
55
hw05_parallel_execution/go.sum
Normal file
55
hw05_parallel_execution/go.sum
Normal file
@@ -0,0 +1,55 @@
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
|
||||
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 h1:2M3HP5CCK1Si9FQhwnzYhXdG6DXeebvUHFpre8QvbyI=
|
||||
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
|
||||
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 h1:myAQVi0cGEoqQVR5POX+8RR2mrocKqNN1hmeMqhX27k=
|
||||
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY=
|
||||
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
15
hw05_parallel_execution/run.go
Normal file
15
hw05_parallel_execution/run.go
Normal file
@@ -0,0 +1,15 @@
|
||||
package hw05parallelexecution
|
||||
|
||||
import (
|
||||
"errors"
|
||||
)
|
||||
|
||||
var ErrErrorsLimitExceeded = errors.New("errors limit exceeded")
|
||||
|
||||
type Task func() error
|
||||
|
||||
// Run starts tasks in n goroutines and stops its work when receiving m errors from tasks.
|
||||
func Run(tasks []Task, n, m int) error {
|
||||
// Place your code here.
|
||||
return nil
|
||||
}
|
||||
70
hw05_parallel_execution/run_test.go
Normal file
70
hw05_parallel_execution/run_test.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package hw05parallelexecution
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/goleak"
|
||||
)
|
||||
|
||||
func TestRun(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
t.Run("if were errors in first M tasks, than finished not more N+M tasks", func(t *testing.T) {
|
||||
tasksCount := 50
|
||||
tasks := make([]Task, 0, tasksCount)
|
||||
|
||||
var runTasksCount int32
|
||||
|
||||
for i := 0; i < tasksCount; i++ {
|
||||
err := fmt.Errorf("error from task %d", i)
|
||||
tasks = append(tasks, func() error {
|
||||
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
|
||||
atomic.AddInt32(&runTasksCount, 1)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
workersCount := 10
|
||||
maxErrorsCount := 23
|
||||
err := Run(tasks, workersCount, maxErrorsCount)
|
||||
|
||||
require.Truef(t, errors.Is(err, ErrErrorsLimitExceeded), "actual err - %v", err)
|
||||
require.LessOrEqual(t, runTasksCount, int32(workersCount+maxErrorsCount), "extra tasks were started")
|
||||
})
|
||||
|
||||
t.Run("tasks without errors", func(t *testing.T) {
|
||||
tasksCount := 50
|
||||
tasks := make([]Task, 0, tasksCount)
|
||||
|
||||
var runTasksCount int32
|
||||
var sumTime time.Duration
|
||||
|
||||
for i := 0; i < tasksCount; i++ {
|
||||
taskSleep := time.Millisecond * time.Duration(rand.Intn(100))
|
||||
sumTime += taskSleep
|
||||
|
||||
tasks = append(tasks, func() error {
|
||||
time.Sleep(taskSleep)
|
||||
atomic.AddInt32(&runTasksCount, 1)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
workersCount := 5
|
||||
maxErrorsCount := 1
|
||||
|
||||
start := time.Now()
|
||||
err := Run(tasks, workersCount, maxErrorsCount)
|
||||
elapsedTime := time.Since(start)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed")
|
||||
require.LessOrEqual(t, int64(elapsedTime), int64(sumTime/2), "tasks were run sequentially?")
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user