Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions examples/parallel_merge_sort/parallel_merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"

gtf "github.com/noneback/go-taskflow"
"github.com/noneback/go-taskflow/utils"
)

// merge sorted src to sorted dest
Expand All @@ -36,18 +37,23 @@ func mergeInto(dest, src []int) []int {
return tmp
}
func main() {
size := 100
radomArr := make([][]int, 10)
sortedArr := make([]int, 0, 10*size)
pprof := utils.NewPprofUtils(utils.CPU, "./out.prof")
pprof.StartProfile()
defer pprof.StopProfile()

size := 10000
share := 1000
randomArr := make([][]int, share)
sortedArr := make([]int, 0, share*size)
mutex := &sync.Mutex{}

for i := 0; i < 10; i++ {
for i := 0; i < share; i++ {
for j := 0; j < size; j++ {
radomArr[i] = append(radomArr[i], rand.Int())
randomArr[i] = append(randomArr[i], rand.Int())
}
}

sortTasks := make([]*gtf.Task, 10)
sortTasks := make([]*gtf.Task, share)
tf := gtf.NewTaskFlow("merge sort")
done := tf.NewTask("Done", func() {
if !slices.IsSorted(sortedArr) {
Expand All @@ -57,19 +63,19 @@ func main() {
fmt.Println(sortedArr[:1000])
})

for i := 0; i < 10; i++ {
sortTasks[i] = tf.NewTask("sort_"+strconv.Itoa(i), func() {
arr := radomArr[i]
for i := 0; i < share; i++ {
idx := i
sortTasks[idx] = tf.NewTask("sort_"+strconv.Itoa(idx), func() {
arr := randomArr[idx]
slices.Sort(arr)
mutex.Lock()
defer mutex.Unlock()
sortedArr = mergeInto(sortedArr, arr)
})

}
done.Succeed(sortTasks...)

executor := gtf.NewExecutor(1000)
executor := gtf.NewExecutor(1000000)

executor.Run(tf).Wait()

Expand Down
2 changes: 1 addition & 1 deletion examples/priority/priority.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package priority
package main

import (
"fmt"
Expand Down
58 changes: 58 additions & 0 deletions utils/pprof.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package utils

import (
"os"
"runtime/pprof"
)

type ProfileType int

type PprofUtils struct {
f *os.File
profile ProfileType
}

func NewPprofUtils(profile ProfileType, output string) *PprofUtils {
p := &PprofUtils{}
f, err := os.Create(output)
if err != nil {
panic(err)
}
p.f = f
p.profile = profile

return p
}

const (
CPU ProfileType = iota
HEAP
)

func (p *PprofUtils) StartProfile() {
switch p.profile {
case CPU:
if err := pprof.StartCPUProfile(p.f); err != nil {
panic(err)

Check warning on line 36 in utils/pprof.go

View check run for this annotation

Codecov / codecov/patch

utils/pprof.go#L36

Added line #L36 was not covered by tests
}
case HEAP:
if err := pprof.WriteHeapProfile(p.f); err != nil {
panic(err)

Check warning on line 40 in utils/pprof.go

View check run for this annotation

Codecov / codecov/patch

utils/pprof.go#L40

Added line #L40 was not covered by tests
}
default:
panic("unsupported profile type")
}

}

func (p *PprofUtils) StopProfile() {
defer p.f.Close()

switch p.profile {
case CPU:
pprof.StopCPUProfile()
case HEAP:
default:
panic("unsupported profile type")

Check warning on line 56 in utils/pprof.go

View check run for this annotation

Codecov / codecov/patch

utils/pprof.go#L55-L56

Added lines #L55 - L56 were not covered by tests
}
}
71 changes: 71 additions & 0 deletions utils/pprof_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package utils

import (
"io/ioutil"
"os"
"testing"
)

func TestCPUProfile(t *testing.T) {
tmpFile, err := os.CreateTemp("", "cpu_profile.*.prof")
if err != nil {
t.Fatalf("Failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()

profiler := NewPprofUtils(CPU, tmpFile.Name())
defer profiler.StopProfile()

profiler.StartProfile()

doCPUWork()
}

func TestHeapProfile(t *testing.T) {
tmpFile, err := ioutil.TempFile("", "heap_profile.*.prof")
if err != nil {
t.Fatalf("Failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()

profiler := NewPprofUtils(HEAP, tmpFile.Name())
defer profiler.StopProfile()
profiler.StartProfile()

doMemoryWork()
}

func TestInvalidProfileType(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Error("Expected panic for invalid profile type, but got none")
}
}()

p := NewPprofUtils(ProfileType(99), "invalid.prof")
p.StartProfile()
defer p.StopProfile()
}

func TestFileCreateError(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Error("Expected panic when file creation fails, but got none")
}
}()

_ = NewPprofUtils(CPU, "/nonexistent/cpu.prof")
}

func doCPUWork() {
for i := 0; i < 1e6; i++ {
_ = i * i
}
}

func doMemoryWork() {
data := make([]byte, 10<<20)
_ = data
}