How to solve the problem of go language [mysql] packets.go:436: busy buffer?

< hr >

finally found that time.Sleep (100 * time.Millisecond) will affect the database. After removing it, I didn"t encounter the problem of busy buffer

. < hr >

I have encountered some problems in converting a piece of data from Table 1 to Table 2 in the past two days.

error prompt:

post total: 2041781
 bbs_post 
 bbs_post 
 2040031 / 2041781  post: 0errlongDataArr: []
[mysql] 2018/03/31 02:04:52 packets.go:72: read tcp 172.16.0.11:19472->123.207.235.24:3306: read: connection reset by peer
[mysql] 2018/03/31 02:04:52 packets.go:436: busy buffer
 bbs_post (2040031)

the code is as follows:
GitHub: https://github.com/skiy/xiuno.

package xn3ToXn4

import (
    "fmt"
    _ "github.com/go-sql-driver/mysql"
    "github.com/skiy/xiuno-tools/lib"
    "log"
    "strings"
    "time"
)

type post struct {
    db3str,
    db4str dbstr
    fields postFields
    waitFix,
    count,
    total int
}

type postFields struct {
    tid, pid, uid, isfirst, create_date, userip, images, files, message, message_fmt string
}

func (this *post) update() {
    if !lib.AutoUpdate(this.db4str.Auto, this.db4str.DBPre+"post") {
        return
    }

    currentTime := time.Now()

    err := this.toUpdate(this.waitFix)
    if err != nil {
        log.Fatalln(" " + this.db3str.DBPre + "post : " + err.Error())
    }

    this.toUpdate(this.waitFix)

    fmt.Printf(" %spost (%d)\r\n", this.db3str.DBPre, this.count)

    fmt.Println("\r\n post : ", time.Since(currentTime))
}

func (this *post) toUpdate(fixFlag int) (err error) {

    xn3pre := this.db3str.DBPre
    xn4pre := this.db4str.DBPre

    xn3db, err := this.db3str.Connect()
    query, err := xn3db.Query("select * from " + xn3pre + "post")
    if err != nil {
        fmt.Println("", err.Error())
        return
    }
    defer query.Close()

    oldField := "tid,pid,uid,isfirst,create_date,userip,images,files,message"
    fields := oldField + ",message_fmt"
    msgFmtExist := false
    cols, _ := query.Columns()
    for _, v := range cols {
        if v == "message_fmt" {
            oldField += ",message_fmt"
            msgFmtExist = true
            break
        }
    }

    xn3 := fmt.Sprintf("SELECT %s FROM %spost", oldField, xn3pre)
    xn5 := fmt.Sprintf("INSERT INTO %spost (%s) VALUES ", xn4pre, fields)
    qmark := this.db3str.FieldMakeValue(fields)

    qmark2 := this.db3str.FieldMakeQmark(fields, "?")
    xn4 := fmt.Sprintf("INSERT INTO %spost (%s) VALUES (%s)", xn4pre, fields, qmark2)
    //fmt.Println("Xiuno 3: " + xn3)
    //fmt.Println("Xiuno 5: " + xn5)

    if fixFlag == 1 {
        return this.fixPost(oldField, xn4, msgFmtExist)
    }

    xn3count := fmt.Sprintf("SELECT COUNT(*) AS count FROM %spost", xn3pre)
    rows := xn3db.QueryRow(xn3count)
    rows.Scan(&this.total)
    fmt.Printf("post total: %d \r\n", this.total)

    data, err := xn3db.Query(xn3)
    if err != nil {
        log.Fatalln(xn3, err.Error())
    }
    defer data.Close()

    xn4db, _ := this.db4str.Connect()

    xn4db.SetMaxIdleConns(0)
    xn4db.SetMaxOpenConns(100)
    xn4db.SetConnMaxLifetime(time.Second * 10)

    xn4Clear := "TRUNCATE `" + xn4pre + "post`"
    _, err = xn4db.Exec(xn4Clear)
    if err != nil {
        log.Fatalf("::: %spost : "+err.Error(), xn4pre)
    }
    fmt.Printf(" %spost \r\n", xn4pre)

    fmt.Printf(" %spost \r\n", xn4pre)

    //dataArr := make([]postFields, ...)

    var dataArr []postFields
    var longDataArr, errLongDataArr [][]postFields

    var sqlStr string
    var sqlArr []string

    start := 0
    times := 0
    offset := 50
    maxTimes := 50

    var field postFields
    for data.Next() {
        if msgFmtExist {
            err = data.Scan(
                &field.tid,
                &field.pid,
                &field.uid,
                &field.isfirst,
                &field.create_date,
                &field.userip,
                &field.images,
                &field.files,
                &field.message,
                &field.message_fmt)
        } else {
            err = data.Scan(
                &field.tid,
                &field.pid,
                &field.uid,
                &field.isfirst,
                &field.create_date,
                &field.userip,
                &field.images,
                &field.files,
                &field.message)
        }

        if err != nil {
            fmt.Printf("(%s) \r\n", err.Error())
        } else {
            
            if field.message_fmt == "" {
                field.message_fmt = field.message
            }

            dataArr = append(dataArr, field)
            startPP

            if start%offset == 0 {
                timesPP
                longDataArr = append(longDataArr, dataArr)
                dataArr = nil

                if times >= maxTimes {
                    for _, v := range longDataArr {
                        sqlArr = this.makeFileSql(qmark, v)
                        sqlStr = xn5 + strings.Join(sqlArr, ",")
                        _, err = xn4db.Exec(sqlStr)
                        if err != nil {
                            fmt.Printf("%d - v - (%s) \r\n", start, err.Error())
                            fmt.Println(v)

                            errLongDataArr = append(errLongDataArr, v)
                        } else {
                            this.count += len(v)
                            lib.UpdateProcess(fmt.Sprintf(" %d / %d  post: %d", this.count, this.total, len(errLongDataArr)))
                        }
                    }

                    times = 0
                    longDataArr = nil
                }

                start = 0
            }

        }
    }

    if err = data.Err(); err != nil {
        log.Fatalln("dataErr: " + err.Error())
    }

    if dataArr != nil {
        sqlArr = this.makeFileSql(qmark, dataArr)
        sqlStr = xn5 + strings.Join(sqlArr, ",")
        _, err = xn4db.Exec(sqlStr)

        if err != nil {
            fmt.Printf("dataArr - (%s) \r\n", err.Error())

            errLongDataArr = append(errLongDataArr, dataArr)
        }
        this.count += len(dataArr)
        lib.UpdateProcess(fmt.Sprintf(" %d / %d  post: %d", this.count, this.total, len(errLongDataArr)))
    }

    fmt.Println("errlongDataArr:", errLongDataArr)

    //
    if errLongDataArr != nil {
        stmt, err := xn4db.Prepare(xn4)
        if err != nil {
            log.Fatalln(": " + err.Error())
        }

        start = 0
        errCount := 0
        for _, values := range errLongDataArr {
            for _, value := range values {
                startPP
                fmt.Sprintf(": %d \r\n", start)

                _, err = stmt.Exec(
                    &value.tid,
                    &value.pid,
                    &value.uid,
                    &value.isfirst,
                    &value.create_date,
                    &value.userip,
                    &value.images,
                    &value.files,
                    &value.message,
                    &value.message_fmt)

                if err != nil {
                    fmt.Printf("(%s) \r\n", err.Error())
                    errCountPP
                } else {
                    this.countPP
                    lib.UpdateProcess(fmt.Sprintf(" %d / %d  post: %d", this.count, this.total, errCount))
                }
            }
        }
    }

    if err != nil {
        log.Fatalln("txErr: " + err.Error())
    }

    defer xn3db.Close()
    defer xn4db.Close()

    if this.total-this.count > 0 {
        //,
        this.waitFix = 1
    }
    return err
}

func (this *post) fixPost(oldField, xn4 string, msgFmtExist bool) (err error) {
    sql := "SELECT " + oldField + " FROM %s WHERE pid NOT IN (SELECT pid FROM %s)"

    xn3dbName := this.db3str.DBName + "." + this.db3str.DBPre + "post"
    xn4dbName := this.db4str.DBName + "." + this.db4str.DBPre + "post"
    xn3sql := fmt.Sprintf(sql, xn3dbName, xn4dbName)

    xn3db, err := this.db3str.Connect()
    data, err := xn3db.Query(xn3sql)
    if err != nil {
        fmt.Println("", err.Error())
        return
    }
    defer xn3db.Close()

    if data != nil {

        xn4db, err := this.db4str.Connect()
        stmt, err := xn4db.Prepare(xn4)

        if err != nil {
            log.Fatalln(": " + err.Error())
        }

        var field postFields
        for data.Next() {
            errCount := 0
            if msgFmtExist {
                err = data.Scan(
                    &field.tid,
                    &field.pid,
                    &field.uid,
                    &field.isfirst,
                    &field.create_date,
                    &field.userip,
                    &field.images,
                    &field.files,
                    &field.message,
                    &field.message_fmt)
            } else {
                err = data.Scan(
                    &field.tid,
                    &field.pid,
                    &field.uid,
                    &field.isfirst,
                    &field.create_date,
                    &field.userip,
                    &field.images,
                    &field.files,
                    &field.message)
            }

            if field.message_fmt == "" {
                field.message_fmt = field.message
            }

            _, err = stmt.Exec(
                &field.tid,
                &field.pid,
                &field.uid,
                &field.isfirst,
                &field.create_date,
                &field.userip,
                &field.images,
                &field.files,
                &field.message,
                &field.message_fmt)

            if err != nil {
                fmt.Printf("PID (%s) (%s) \r\n", field.pid, err.Error())
                errCountPP
            } else {
                this.countPP
                lib.UpdateProcess(fmt.Sprintf(" %d / %d  post: %d", this.count, this.total, errCount))
            }
        }
    }

    return
}

func (this *post) makeFileSql(qmark string, dataArr []postFields) (dataStr []string) {
    for _, field := range dataArr {
        dataStr = append(dataStr, "("+fmt.Sprintf(qmark,
            field.tid,
            field.pid,
            field.uid,
            field.isfirst,
            field.create_date,
            field.userip,
            field.images,
            field.files,
            field.message,
            field.message_fmt)+")")
    }
    return
}
Mar.01,2021

is there something wrong with this code or not?

Menu