这里是一些脚本调用的地方,工具源码放在后面两个代码块了。
util.TaskConsumer[[]string](10).
SetP(lineopt.IterExcel2("xxx.xlsx")).
SetC(func(index int, row []string) (err error) {
if index == 1 {
return
}
// .....
// 这里是逻辑处理函数
return
}).
Run()
这是两个封装的函数的源码。
package lineopt
import (
"bufio"
"fmt"
"github.com/xuri/excelize/v2"
"iter"
"log/slog"
"os"
)
func IterLine2(filePath string) iter.Seq2[int, string] {
return func(yield func(int, string) bool) {
f, errF := os.OpenFile(filePath, os.O_RDONLY, 0666)
if errF != nil {
return
}
defer func(f *os.File) {
err := f.Close()
if err != nil {
fmt.Println(err)
}
}(f)
scanner := bufio.NewScanner(f)
index := 1
for scanner.Scan() {
line := scanner.Text()
if !yield(index, line) {
return
}
index += 1
}
}
}
func IterLine(filePath string) iter.Seq[string] {
return func(yield func(string) bool) {
for _, item := range IterLine2(filePath) {
if !yield(item) {
return
}
}
}
}
func MapIterExcel2(config ExcelTarget) iter.Seq2[int, []string] {
return func(yield func(int, []string) bool) {
f, err := excelize.OpenFile(config.FilePath)
if err != nil {
slog.Error(err.Error())
return
}
defer f.Close()
targetSheet := config.TargetSheet
if targetSheet == "" {
targetSheet = f.GetSheetName(0)
}
rows, err := f.Rows(targetSheet)
if err != nil {
slog.Error(err.Error())
return
}
index := 1
for rows.Next() {
row, err := rows.Columns()
if err != nil {
slog.Error(err.Error())
return
}
if !yield(index, row) {
return
}
index += 1
}
return
}
}
func MapIterExcel(config ExcelTarget) iter.Seq[[]string] {
return func(yield func([]string) bool) {
for _, value := range MapIterExcel2(config) {
if !yield(value) {
return
}
}
}
}
func IterExcel2(filePath string) iter.Seq2[int, []string] {
return func(yield func(int, []string) bool) {
for index, value := range MapIterExcel2(ExcelTarget{FilePath: filePath}) {
if !yield(index, value) {
return
}
}
}
}
func IterExcel(filePath string) iter.Seq[[]string] {
return func(yield func([]string) bool) {
for _, value := range MapIterExcel2(ExcelTarget{FilePath: filePath}) {
if !yield(value) {
return
}
}
}
}
func IterExcelSheet2(filePath string, sheetName string) iter.Seq2[int, []string] {
return func(yield func(int, []string) bool) {
for index, value := range MapIterExcel2(ExcelTarget{
FilePath: filePath,
TargetSheet: sheetName,
}) {
if !yield(index, value) {
return
}
}
}
}
func IterExcelSheet(filePath string, sheetName string) iter.Seq[[]string] {
return func(yield func([]string) bool) {
for _, value := range MapIterExcel2(ExcelTarget{
FilePath: filePath,
TargetSheet: sheetName,
}) {
if !yield(value) {
return
}
}
}
}
package util
import (
"dt/app/util/lineopt"
"errors"
"iter"
"sync"
)
func ChannelConsume[d any](queue chan d, job func(item d), number ...int) *sync.WaitGroup {
counter := 10
if len(number) == 1 && number[0] > 0 {
counter = number[0]
}
return StartTogether(func() {
for item := range queue {
job(item)
}
}, counter)
}
// Together 并行执行
func Together(job func(), counter int) {
var wg sync.WaitGroup
for i := 1; i <= counter; i++ {
wg.Add(1)
go func() {
defer wg.Done()
job()
}()
}
wg.Wait()
}
func StartTogether(job func(), counter int) *sync.WaitGroup {
var wg sync.WaitGroup
for i := 1; i <= counter; i++ {
wg.Add(1)
go func() {
defer wg.Done()
job()
}()
}
return &wg
}
type chanData[d any] struct {
index int
data d
}
func ChannelConsume2[d any](queue chan chanData[d], job func(index int, item d) (err error), number ...int) *sync.WaitGroup {
counter := 10
if len(number) == 1 && number[0] > 0 {
counter = number[0]
}
return StartTogether(func() {
for item := range queue {
err := job(item.index, item.data)
if errors.Is(err, lineopt.Stop) {
// 目前不可以直接停止,会导致消费者阻塞掉
//return
}
}
}, counter)
}
type ProducerConsumer[T any] struct {
consumerNumber int
queue chan chanData[T]
p iter.Seq2[int, T]
c func(index int, item T) (err error)
once sync.Once
}
func (itself *ProducerConsumer[T]) SetC(c func(index int, item T) (err error)) *ProducerConsumer[T] {
itself.c = c
return itself
}
func (itself *ProducerConsumer[T]) SetP(p iter.Seq2[int, T]) *ProducerConsumer[T] {
itself.p = p
return itself
}
// 生产者消费者都有可能发生阻塞,
// 生产者阻塞的原因是因为 queue 容量不够了
// 消费者阻塞的原因的是因为 queue 没有 close
// 生产者只需要实现即可
func (itself *ProducerConsumer[T]) do() {
task := ChannelConsume2(itself.queue, func(index int, item T) (err error) {
return itself.c(index, item)
}, itself.consumerNumber)
defer task.Wait()
defer close(itself.queue)
for index, v := range itself.p {
select {
case itself.queue <- chanData[T]{
index,
v,
}:
break
// 需要一个可以知道提前截止的操作
}
}
}
func (itself *ProducerConsumer[T]) Run() {
itself.once.Do(func() {
itself.do()
})
}
func TaskConsumer[T any](consumerNumber ...int) *ProducerConsumer[T] {
n := 1
if len(consumerNumber) > 0 {
n = consumerNumber[0]
}
return &ProducerConsumer[T]{
queue: make(chan chanData[T], n),
consumerNumber: n,
}
}
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.