Go Gin文件验证和10w+excel大数据量流式读取快速导入
根据gin文档中的自定义验证multipart.FileHeader类型的字段在验证过程中可能会遇到问题。为了解决这个问题,您可以使用自定义验证器来处理multipart.FileHeader类型的字段。github.com/zeromicro/go-zero/core/threading 控制并发。这里我使用了github.com/dustin/go-humanize这个库解析文件大小单位。gi
·
1. golang在gin中实现文件验证
- 根据gin文档中的自定义验证multipart.FileHeader类型的字段在验证过程中可能会遇到问题
- 为了解决这个问题,您可以使用自定义验证器来处理multipart.FileHeader类型的字段。
- 实现方式使用 RegisterStructValidation 注册, 使用 RegisterValidation 注册会存在问题
- FileExeclUploadValidation方法中会使用反射获取tag实现动态验证
- 这里我使用了github.com/dustin/go-humanize这个库解析文件大小单位
安装依赖
go get github.com/dustin/go-humanize
go get github.com/jinzhu/copier
go get github.com/samber/lo
go get github.com/zeromicro/go-zero/core/threading
go get github.com/xuri/excelize/v2
文件验证代码如下
func init() {
validate, ok := binding.Validator.Engine().(*validator.Validate)
if ok {
validate.RegisterStructValidation(FileExeclUploadValidation, FileExeclUpload{})
}
}
type FileExeclUpload struct {
File *multipart.FileHeader `form:"file" binding:"required" fileSize:"10M" fileSuffix:"xlsx|xls" msg:"请上传10M大小内的Excel表格"`
}
// 文件验证
func FileExeclUploadValidation(sl validator.StructLevel) {
form := sl.Current().Interface().(FileExeclUpload)
formType := reflect.TypeOf(form)
for i := 0; i < formType.NumField(); i++ {
field := formType.Field(i)
if field.Type != reflect.TypeOf(&multipart.FileHeader{}) {
continue
}
fileTag := field.Tag.Get("form")
if fileTag == "" {
continue
}
fileField := reflect.ValueOf(form).Field(i)
file := fileField.Interface().(*multipart.FileHeader)
if file == nil {
continue
}
fileSizeTag := field.Tag.Get("fileSize")
if !CheckFileSize(file, fileSizeTag) {
sl.ReportError(file, field.Name, "fileSize", "CheckFileSize", fmt.Sprintf("请上传%s大小内的文件", fileSizeTag))
}
fileSuffixTag := field.Tag.Get("fileSuffix")
if !CheckFileSuffix(file, fileSuffixTag) {
sl.ReportError(file, field.Name, "fileSuffix", "CheckFileSuffix", fmt.Sprintf("请上传扩展名为%s的文件", fileSuffixTag))
}
}
}
func CheckFileSize(file *multipart.FileHeader, fileSize string) bool {
maxSize, err := humanize.ParseBytes(fileSize)
if err != nil {
maxSize = 10 * 1024 * 1024 // 10M
}
return uint64(file.Size) <= maxSize
}
func CheckFileSuffix(file *multipart.FileHeader, allowedSuffixes string) bool {
ext := strings.ToLower(filepath.Ext(file.Filename))
allowed := strings.Split(allowedSuffixes, "|")
for _, suffix := range allowed {
if ext == "."+suffix {
return true
}
}
return false
}
func main() {
r := gin.Default()
v := validator.New()
r.POST("/upload", func(c *gin.Context) {
req := request.FileExeclUpload{}
// 参数验证
if err := c.ShouldBind(&req); err != nil {
response.Fail(c, response.Response{
Code: 10001,
Msg: request.GetErrorMessage(err, req),
})
return
}
c.JSON(http.StatusOK, gin.H{"message": "File uploaded successfully!"})
})
r.Run(":8080")
}
2. golang在gin中实现大量excel数据上传插入
- 数据量大的情况下一次性写入会导入时间、内存方面都会有问题
- 我解决的方案就是分组导入,一次性导入1000、并发控制100
- 在里面我使用了几个很实用的库
- github.com/jinzhu/copier 可以B结构体数据复制给A结构体上面
- github.com/samber/lo 操作map或者slice提供了很多方法
- github.com/zeromicro/go-zero/core/threading 控制并发
这是第一种写法不是流式读取
controller/import.go
func ImportWhite(c *gin.Context) {
req := request.FileExeclUpload{}
// 参数验证
if err := c.ShouldBind(&req); err != nil {
response.Fail(c, response.Response{
Code: 10001,
Msg: request.GetErrorMessage(err, req),
})
return
}
// 打开文件
file, err := req.File.Open()
if err != nil {
response.Fail(c, response.Response{
Code: 10001,
Msg: err.Error(),
})
return
}
defer func() {
if err := file.Close(); err != nil {
fmt.Println(err.Error())
}
}()
f, err := excelize.OpenReader(file)
if err != nil {
response.Fail(c, response.Response{
Code: 10001,
Msg: err.Error(),
})
return
}
defer func() {
if err := f.Close(); err != nil {
fmt.Println(err.Error())
}
}()
resp, err := service.ImportWhite(c, f)
if err != nil {
response.Fail(c, response.Response{
Code: 10001,
Msg: err.Error(),
})
return
}
response.Success(c, response.Response{
Code: 10000,
Data: resp,
Msg: "导入成功",
})
}
service/import.go
func ImportWhite(c *gin.Context, f *excelize.File) (*response.ImportWhiteResp, error) {
var (
response = response.ImportWhiteResp{}
now = time.Now()
creatorName = c.GetString("user_name")
creatorId = int32(c.GetInt("user_id"))
)
rows, err := f.GetRows(f.GetSheetName(f.GetActiveSheetIndex()))
rows = rows[1:]
if err != nil {
return nil, err
}
// 只获取第一列去重
nameList := []string{}
lo.ForEach[[]string](rows, func(item []string, index int) {
nameList = append(nameList, item[0])
})
yLen := len(nameList)
nameList = lo.Uniq[string](nameList)
response.FailTotal += uint32(yLen - len(nameList))
// 并发插入
namesBatches := lo.Chunk[string](nameList, 1000)
wg := sync.WaitGroup{}
task := threading.NewTaskRunner(100)
for _, batch := range namesBatches {
wg.Add(1)
names := batch
task.Schedule(func() {
defer wg.Done()
// 查询和插入
uniqNum, err := model.CreateUniqWhite(names, creatorName, creatorId)
if err == nil {
response.SuccessTotal += uint32(len(names)) - uniqNum
}
response.FailTotal += uniqNum
})
}
wg.Wait()
return &response, nil
}
model/import.go
// 去重插入
func CreateUniqWhite(names []string, creatorName string, creatorId int32) (uniqNum uint32, err error) {
now := time.Now()
db := global.DB.Model(WhiteList{})
// 查询
list, err := GetWhiteNameListByNames(names)
if err != nil {
return
}
// 和数据库对比去重
left, _ := lo.Difference[string](names, list)
uniqNum += uint32(len(names) - len(left))
whiteData := []WhiteList{}
lo.ForEach[string](left, func(item string, _ int) {
whiteData = append(whiteData, WhiteList{
CreatorName: creatorName,
CreatorID: creatorId,
CreatedAt: now,
WhiteName: item,
})
})
if len(whiteData) > 0 {
err = db.Create(&whiteData).Error
}
return uniqNum, err
}
第二种方式改良版本快一倍,流式读取导入
service/import.go
// 改用流式读取比 10W数据导入本地6.7s-->比不是流式读取快一倍
func ImportWhite(c *gin.Context, f *excelize.File) (*response.ImportWhiteResp, error) {
var (
response = response.ImportWhiteResp{}
creatorName = c.GetString("user_name")
creatorId = int32(c.GetInt("user_id"))
)
excelReader := util.NewExcelReader(f)
err := excelReader.ReadAndSendToChannel(1, 1000)
if err != nil {
return nil, err
}
wg := sync.WaitGroup{}
task := threading.NewTaskRunner(100)
for rows := range excelReader.DataChan {
yLen := len(rows)
// 第一列去重
seen := make(map[string]struct{}, len(rows))
names := lo.FilterMap[[]string, string](rows, func(item []string, index int) (string, bool) {
name := item[0]
if _, ok := seen[name]; ok {
return name, false
}
seen[name] = struct{}{}
return name, true
})
// 并发插入
wg.Add(1)
task.Schedule(func() {
defer wg.Done()
// 插入数据
uniqNum, err := model.CreateUniqWhite(names, creatorName, creatorId)
if err == nil {
response.SuccessTotal += uint32(len(names)) - uniqNum
}
response.FailTotal += uniqNum + uint32(yLen-len(names))
})
}
wg.Wait()
return &response, nil
}
util/excel.go
package util
import (
"github.com/samber/lo"
"github.com/xuri/excelize/v2"
"github.com/zeromicro/go-zero/core/threading"
)
type ExcelReader struct {
file *excelize.File
DataChan chan [][]string
}
func NewExcelReader(file *excelize.File) *ExcelReader {
return &ExcelReader{
file: file,
DataChan: make(chan [][]string, 100),
}
}
// start 从第几行开始读
// 每 n 份数量的数据发送到管道
func (r *ExcelReader) ReadAndSendToChannel(start int, num int) error {
rows, err := r.file.Rows(r.file.GetSheetName(r.file.GetActiveSheetIndex()))
if err != nil {
return err
}
results, count, i := make([][]string, 0, num), 0, 0
// 这里我用的gozero封装的协程,里面带了异常捕获
threading.GoSafe(func() {
defer close(r.DataChan)
defer rows.Close()
for rows.Next() {
i++
if start >= i {
continue
}
row, err := rows.Columns()
if err != nil {
break
}
if len(row) > 0 {
count++
results = append(results, row)
}
if count%num == 0 {
r.DataChan <- results
count = 0
results = make([][]string, 0, num)
}
}
if count > 0 {
r.DataChan <- results
}
})
return nil
}
model/import.go
var createMu sync.Mutex
// 去重插入
func CreateUniqWhite(names []string, creatorName string, creatorId int32) (uniqNum uint32, err error) {
// 使用互斥锁,可能多个缓冲组数据有重复,插入就会报异常
createMu.Lock()
defer createMu.Unlock()
…………其他代码不变
}
第三种导入方式不需要互斥锁增加过滤回调方法,在第二个方案上优化10w用时4.5s
util/excel.go 上新增一个方法
// start 从第几行开始读
// num 每 n 份数量的数据发送到管道
// columnsFilter 需要过滤的列
// predicate 过滤函数
func (r *ExcelReader) ReadAndSendToChannelFilter(start int, num int, columnsFilter []int, predicate func([]map[string]struct{}, []string) bool) error {
rows, err := r.file.Rows(r.file.GetSheetName(r.file.GetActiveSheetIndex()))
if err != nil {
return err
}
results, count, i := make([][]string, 0, num), 0, 0
dataAll := make([]map[string]struct{}, len(columnsFilter))
// 这里我用的gozero封装的协程,里面带了异常捕获
threading.GoSafe(func() {
defer close(r.DataChan)
defer rows.Close()
for rows.Next() {
i++
if start >= i {
continue
}
row, err := rows.Columns()
if err != nil {
break
}
// 长度判断 + 过滤函数
if len(row) <= 0 || predicate(dataAll, row) {
continue
}
// 加入map切片
lo.ForEach[int](columnsFilter, func(item, _ int) {
key := row[item]
seen := dataAll[item]
if seen == nil {
seen = make(map[string]struct{})
}
seen[key] = struct{}{}
dataAll[item] = seen
})
// 加入队列切片
count++
results = append(results, row)
// 重置计算器和切片
if count-num == 0 {
r.DataChan <- results
count = 0
results = make([][]string, 0, num)
}
}
if count > 0 {
r.DataChan <- results
}
})
return nil
}
servcie/import.go 中使用新方法
excelReader := util.NewExcelReader(f)
// 旧方法
// err := excelReader.ReadAndSendToChannel(1, 1000)
// 过滤导入又快了2.1s 10w用时 4.61
err := excelReader.ReadAndSendToChannelFilter(1, 1000, []int{0}, func(m []map[string]struct{}, s []string) bool {
nameMap := m[0]
name := s[0]
_, ok := nameMap[name]
if ok {
response.FailTotal++
}
return ok
})
model/import.go 中删掉 sync.Lock
// var createMu sync.Mutex
// 去重插入
func CreateUniqWhite(names []string, creatorName string, creatorId int32) (uniqNum uint32, err error) {
// 使用互斥锁,可能多个缓冲组数据有重复,插入就会报异常 ---> 过滤导入不需要互斥锁
// createMu.Lock()
// defer createMu.Unlock()
到你自己项目中使用,你得改动一下request和response的struct
我只是给你们提供一下思路和代码片段
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
已为社区贡献1条内容
所有评论(0)