diff --git a/examples/parallel_merge_sort/parallel_merge_sort.go b/examples/parallel_merge_sort/parallel_merge_sort.go index 550370f..62ca586 100644 --- a/examples/parallel_merge_sort/parallel_merge_sort.go +++ b/examples/parallel_merge_sort/parallel_merge_sort.go @@ -10,6 +10,7 @@ import ( "sync" gtf "github.com/noneback/go-taskflow" + "github.com/noneback/go-taskflow/utils" ) // merge sorted src to sorted dest @@ -36,18 +37,23 @@ func mergeInto(dest, src []int) []int { return tmp } func main() { - size := 100 - radomArr := make([][]int, 10) - sortedArr := make([]int, 0, 10*size) + pprof := utils.NewPprofUtils(utils.CPU, "./out.prof") + pprof.StartProfile() + defer pprof.StopProfile() + + size := 10000 + share := 1000 + randomArr := make([][]int, share) + sortedArr := make([]int, 0, share*size) mutex := &sync.Mutex{} - for i := 0; i < 10; i++ { + for i := 0; i < share; i++ { for j := 0; j < size; j++ { - radomArr[i] = append(radomArr[i], rand.Int()) + randomArr[i] = append(randomArr[i], rand.Int()) } } - sortTasks := make([]*gtf.Task, 10) + sortTasks := make([]*gtf.Task, share) tf := gtf.NewTaskFlow("merge sort") done := tf.NewTask("Done", func() { if !slices.IsSorted(sortedArr) { @@ -57,19 +63,19 @@ func main() { fmt.Println(sortedArr[:1000]) }) - for i := 0; i < 10; i++ { - sortTasks[i] = tf.NewTask("sort_"+strconv.Itoa(i), func() { - arr := radomArr[i] + for i := 0; i < share; i++ { + idx := i + sortTasks[idx] = tf.NewTask("sort_"+strconv.Itoa(idx), func() { + arr := randomArr[idx] slices.Sort(arr) mutex.Lock() defer mutex.Unlock() sortedArr = mergeInto(sortedArr, arr) }) - } done.Succeed(sortTasks...) - executor := gtf.NewExecutor(1000) + executor := gtf.NewExecutor(1000000) executor.Run(tf).Wait() diff --git a/examples/priority/priority.go b/examples/priority/priority.go index 1ddab5b..773224e 100644 --- a/examples/priority/priority.go +++ b/examples/priority/priority.go @@ -1,4 +1,4 @@ -package priority +package main import ( "fmt" diff --git a/utils/pprof.go b/utils/pprof.go new file mode 100644 index 0000000..f29a1c5 --- /dev/null +++ b/utils/pprof.go @@ -0,0 +1,58 @@ +package utils + +import ( + "os" + "runtime/pprof" +) + +type ProfileType int + +type PprofUtils struct { + f *os.File + profile ProfileType +} + +func NewPprofUtils(profile ProfileType, output string) *PprofUtils { + p := &PprofUtils{} + f, err := os.Create(output) + if err != nil { + panic(err) + } + p.f = f + p.profile = profile + + return p +} + +const ( + CPU ProfileType = iota + HEAP +) + +func (p *PprofUtils) StartProfile() { + switch p.profile { + case CPU: + if err := pprof.StartCPUProfile(p.f); err != nil { + panic(err) + } + case HEAP: + if err := pprof.WriteHeapProfile(p.f); err != nil { + panic(err) + } + default: + panic("unsupported profile type") + } + +} + +func (p *PprofUtils) StopProfile() { + defer p.f.Close() + + switch p.profile { + case CPU: + pprof.StopCPUProfile() + case HEAP: + default: + panic("unsupported profile type") + } +} diff --git a/utils/pprof_test.go b/utils/pprof_test.go new file mode 100644 index 0000000..7103540 --- /dev/null +++ b/utils/pprof_test.go @@ -0,0 +1,71 @@ +package utils + +import ( + "io/ioutil" + "os" + "testing" +) + +func TestCPUProfile(t *testing.T) { + tmpFile, err := os.CreateTemp("", "cpu_profile.*.prof") + if err != nil { + t.Fatalf("Failed to create temp file: %v", err) + } + defer os.Remove(tmpFile.Name()) + tmpFile.Close() + + profiler := NewPprofUtils(CPU, tmpFile.Name()) + defer profiler.StopProfile() + + profiler.StartProfile() + + doCPUWork() +} + +func TestHeapProfile(t *testing.T) { + tmpFile, err := ioutil.TempFile("", "heap_profile.*.prof") + if err != nil { + t.Fatalf("Failed to create temp file: %v", err) + } + defer os.Remove(tmpFile.Name()) + tmpFile.Close() + + profiler := NewPprofUtils(HEAP, tmpFile.Name()) + defer profiler.StopProfile() + profiler.StartProfile() + + doMemoryWork() +} + +func TestInvalidProfileType(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Error("Expected panic for invalid profile type, but got none") + } + }() + + p := NewPprofUtils(ProfileType(99), "invalid.prof") + p.StartProfile() + defer p.StopProfile() +} + +func TestFileCreateError(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Error("Expected panic when file creation fails, but got none") + } + }() + + _ = NewPprofUtils(CPU, "/nonexistent/cpu.prof") +} + +func doCPUWork() { + for i := 0; i < 1e6; i++ { + _ = i * i + } +} + +func doMemoryWork() { + data := make([]byte, 10<<20) + _ = data +}