看到公司其他同学写的 go 批量处理代码,这风骚的感觉像在平行世界一样,还能这么玩?

64 天前
 PungentSauce

这里是一些脚本调用的地方,工具源码放在后面两个代码块了。

util.TaskConsumer[[]string](10).
		SetP(lineopt.IterExcel2("xxx.xlsx")).
		SetC(func(index int, row []string) (err error) {
			if index == 1 {
				return
			}
			// .....
			// 这里是逻辑处理函数
			return
		}).
		Run()

这是两个封装的函数的源码。

package lineopt

import (
	"bufio"
	"fmt"
	"github.com/xuri/excelize/v2"
	"iter"
	"log/slog"
	"os"
)

func IterLine2(filePath string) iter.Seq2[int, string] {
	return func(yield func(int, string) bool) {
		f, errF := os.OpenFile(filePath, os.O_RDONLY, 0666)
		if errF != nil {
			return
		}
		defer func(f *os.File) {
			err := f.Close()
			if err != nil {
				fmt.Println(err)
			}
		}(f)
		scanner := bufio.NewScanner(f)
		index := 1
		for scanner.Scan() {
			line := scanner.Text()
			if !yield(index, line) {
				return
			}
			index += 1
		}
	}
}

func IterLine(filePath string) iter.Seq[string] {
	return func(yield func(string) bool) {
		for _, item := range IterLine2(filePath) {
			if !yield(item) {
				return
			}
		}
	}
}

func MapIterExcel2(config ExcelTarget) iter.Seq2[int, []string] {
	return func(yield func(int, []string) bool) {
		f, err := excelize.OpenFile(config.FilePath)
		if err != nil {
			slog.Error(err.Error())
			return
		}
		defer f.Close()
		targetSheet := config.TargetSheet
		if targetSheet == "" {
			targetSheet = f.GetSheetName(0)
		}
		rows, err := f.Rows(targetSheet)
		if err != nil {
			slog.Error(err.Error())
			return
		}
		index := 1
		for rows.Next() {
			row, err := rows.Columns()
			if err != nil {
				slog.Error(err.Error())
				return
			}
			if !yield(index, row) {
				return
			}
			index += 1
		}
		return
	}
}

func MapIterExcel(config ExcelTarget) iter.Seq[[]string] {
	return func(yield func([]string) bool) {
		for _, value := range MapIterExcel2(config) {
			if !yield(value) {
				return
			}
		}
	}
}

func IterExcel2(filePath string) iter.Seq2[int, []string] {
	return func(yield func(int, []string) bool) {
		for index, value := range MapIterExcel2(ExcelTarget{FilePath: filePath}) {
			if !yield(index, value) {
				return
			}
		}
	}
}

func IterExcel(filePath string) iter.Seq[[]string] {
	return func(yield func([]string) bool) {
		for _, value := range MapIterExcel2(ExcelTarget{FilePath: filePath}) {
			if !yield(value) {
				return
			}
		}
	}
}

func IterExcelSheet2(filePath string, sheetName string) iter.Seq2[int, []string] {
	return func(yield func(int, []string) bool) {
		for index, value := range MapIterExcel2(ExcelTarget{
			FilePath:    filePath,
			TargetSheet: sheetName,
		}) {
			if !yield(index, value) {
				return
			}
		}
	}
}

func IterExcelSheet(filePath string, sheetName string) iter.Seq[[]string] {
	return func(yield func([]string) bool) {
		for _, value := range MapIterExcel2(ExcelTarget{
			FilePath:    filePath,
			TargetSheet: sheetName,
		}) {
			if !yield(value) {
				return
			}
		}
	}
}
package util

import (
	"dt/app/util/lineopt"
	"errors"
	"iter"
	"sync"
)

func ChannelConsume[d any](queue chan d, job func(item d), number ...int) *sync.WaitGroup {
	counter := 10
	if len(number) == 1 && number[0] > 0 {
		counter = number[0]
	}
	return StartTogether(func() {
		for item := range queue {
			job(item)
		}
	}, counter)
}

// Together 并行执行
func Together(job func(), counter int) {
	var wg sync.WaitGroup
	for i := 1; i <= counter; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			job()
		}()
	}
	wg.Wait()
}

func StartTogether(job func(), counter int) *sync.WaitGroup {
	var wg sync.WaitGroup
	for i := 1; i <= counter; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			job()
		}()
	}
	return &wg
}

type chanData[d any] struct {
	index int
	data  d
}

func ChannelConsume2[d any](queue chan chanData[d], job func(index int, item d) (err error), number ...int) *sync.WaitGroup {
	counter := 10
	if len(number) == 1 && number[0] > 0 {
		counter = number[0]
	}
	return StartTogether(func() {
		for item := range queue {
			err := job(item.index, item.data)
			if errors.Is(err, lineopt.Stop) {
				// 目前不可以直接停止,会导致消费者阻塞掉
				//return
			}
		}
	}, counter)
}

type ProducerConsumer[T any] struct {
	consumerNumber int
	queue          chan chanData[T]
	p              iter.Seq2[int, T]
	c              func(index int, item T) (err error)
	once           sync.Once
}

func (itself *ProducerConsumer[T]) SetC(c func(index int, item T) (err error)) *ProducerConsumer[T] {
	itself.c = c
	return itself
}

func (itself *ProducerConsumer[T]) SetP(p iter.Seq2[int, T]) *ProducerConsumer[T] {
	itself.p = p
	return itself
}

// 生产者消费者都有可能发生阻塞,
// 生产者阻塞的原因是因为 queue 容量不够了
// 消费者阻塞的原因的是因为 queue 没有 close
// 生产者只需要实现即可
func (itself *ProducerConsumer[T]) do() {
	task := ChannelConsume2(itself.queue, func(index int, item T) (err error) {
		return itself.c(index, item)
	}, itself.consumerNumber)
	defer task.Wait()
	defer close(itself.queue)
	for index, v := range itself.p {
		select {
		case itself.queue <- chanData[T]{
			index,
			v,
		}:
			break
			// 需要一个可以知道提前截止的操作
		}
	}

}

func (itself *ProducerConsumer[T]) Run() {
	itself.once.Do(func() {
		itself.do()
	})
}

func TaskConsumer[T any](consumerNumber ...int) *ProducerConsumer[T] {
	n := 1
	if len(consumerNumber) > 0 {
		n = consumerNumber[0]
	}
	return &ProducerConsumer[T]{
		queue:          make(chan chanData[T], n),
		consumerNumber: n,
	}
}

6111 次点击
所在节点    Go 编程语言
28 条回复
geebos
63 天前
写了好几年的 go ,很难接受这样的写法
feedcode
63 天前
用了新的 feature, range over functions

https://go.dev/blog/range-functions
As of Go 1.23 it now supports ranging over functions that take a single argument. The single argument must itself be a function that takes zero to two arguments and returns a bool; by convention, we call it the yield function.

```
func(yield func() bool)

func(yield func(V) bool)

func(yield func(K, V) bool)
```
leowyzhuang
63 天前
人和程序有一个能跑就行
kfpenn
61 天前
看了下这个新特性,所以这个东西只是让以后的库能有一个统一的遍历方法,但里面可能还是用的 scanner.Scan(),Rows.Next () 这些?
bronyakaka
61 天前
因为 go 社区很多人吹 go 是函数式编程 结果啥函数式 api 都没有,可能还是泛型的问题
bronyakaka
61 天前
可以看下社区模拟函数式 api 的 lo 库,基本上包含了楼主示例里这些封装
leokun
61 天前
我说 fp 是防御性编程的一种,大家没意见吧
R136a1
61 天前
函数式编程没啥问题,但是 go 的函数式是真难看,通篇读下来我只看到一堆花括号在天上飞

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://ex.noerr.eu.org/t/1147599

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX