Golang concurrency problem

allMap stores a list of tasks, KEY marks this task type, and Value corresponds to the parameters of the task, and now I need to process these tasks concurrently. The following two methods are used in the development process, the effect is not good, I feel that I do not understand the idea of golang concurrent processing; here are some of my experience and doubts, hope to get the guidance of the gods.

< H1 > method one < / H1 >
    // allMap 
    // Task 
    type Task struct {
        Params interface{}
        ResultChan chan []byte
        // Wg *sync.WaitGroup
    }
    ParamsResultChanResultChan ;


    //  
    for key, value := range allMap {
        go func(k string, v interface{}) {
            log.Debug("k : " , k )
            if k == tools.REQUEST_BAOJIE {
                // A
                log.Debug("baojie elem len : ", len(value))

                one_task = &service.Task{
                    Params:     v,
                    ResultChan: make(chan []byte, len(value)),
                    //Wg : new(sync.WaitGroup) ,
                }
                // B
                log.Debugf("1 one_task : %+v ", one_task)

                // AddTaskone_taskone_taskResultChan;
                service.AddTask(one_task)

            } else if k == tools.REQUEST {


            }
        }(key, value)
    }

    // C
    log.Debugf("2 one_task : %+v ", one_task)

    // 
    go func() {
        for item := range one_task.ResultChan {
            log.Debug("Receive data From ResultChan : ", string(item))
        }
        log.Debug("Process ", tools.REQUEST_BAOJIE, " end ")
    }()

the disadvantage of this method depends too much on the sequence of program execution. In the course of testing, it is found that when C occurs before An and B, it will make the receiving result goroutinue access ResultChan members run down, because there is no room for ResultChan to apply at this time.

solution 1: the
service.AddTask (one_task) function adds another parameter, chan <-interface {}, after AddTask processing, the result is written to this channel, and the receiving result co-program listens to the channel, and then reads the result.

< H1 > method two < / H1 >

delay the timing of concurrency

    for k, v := range allMap {
        //go func(k string, v interface{}) {
        log.Debug("k : ", k)
        if k == tools.REQUEST {
            // A
            log.Debug("baojie elem len : ", len(v))
            one_task = &service.Task{
                Params:     v,
                ResultChan: make(chan []byte, len(v)),
                //Wg : new(sync.WaitGroup) ,
            }
            // B
            log.Debugf("1 one_task : %+v ", one_task)
            go service.AddTask(one_task)
        } else if k == tools.REQUEST_TCP {
        }
        //}(key, value)
    }


    // C
    log.Debugf("2 one_task : %+v ", one_task)

    // 
    go func() {
        for item := range one_task.ResultChan {
            log.Debug("Receive data From ResultChan : ", string(item))
        }
        log.Debug("Process ", tools.REQUEST_BAOJIE, " end ")
    }()

this ensures that C must occur after An and B. in this way, the ResultChan must be initialized first, and the receiving result will be read out after waiting for the AddTask to write data into it.

< H2 > question 1 < / H2 > The

question arises, since there is a problem with mode 1, is there any drawback in efficiency in mode 2?

is there a problem with my concurrent logic?

< H2 > question 2 < / H2 >

whether this idea is desirable

var task Task ; 

//   
for key , value := range allMap{
   task := Task{
        params : value  , 
        result : make(chan interface{} , len(value) ) , // value list 
    }
   go processOneByOne(key ,value)   //  len(allmap) 

}

// 
for result := range task.result {
    // get result from chann 
    // to do 
} 

``

-sharp-sharp 3 
chan,processOneByOnechanchannchann 
: 

demo.go

func TodoWork () {

go func(){
    for key ,value := range allMap{
        processOneByOne(key , value ) 
    }
}()

for item := range task.ResultChan {
    //  itemkey valueKEYvalue  
    //  TodoWork
    println(item) 
    
}

}

task.go

var (

)
ResultChan chan interface{} 

)

func init () {

ResultChan = make( chan interface{} , 100 ) 

}

func processOneByOne (key string, value interface {}) {


//   

// .... 


//  


// ResultChan  goroutine  
ResultChan <- "Hello World" 

}








-sharp-sharp-sharp 


-sharp-sharp-sharp 


-sharp-sharp-sharp 
// 


-sharp-sharp-sharp 
-sharp-sharp-sharp 


-sharp-sharp-sharp 


-sharp-sharp-sharp 
// 


-sharp-sharp-sharp 
-sharp-sharp-sharp 


-sharp-sharp-sharp 


-sharp-sharp-sharp 
// 


-sharp-sharp-sharp 
Nov.17,2021

if you think too much about it, you can define the length of the process and even use the pool if there are too many programs at the same time.
simply write an implementation:

type task struct{
    name strig
    params interface{}
    result []byte
}

var gt = make(chan int,10)//10task
var tkr = make(chan task,10)
go func(){
    for _,t := range allTaskMap {
        gt <- 1
        go func(curtask task){
            //task
            tkr <- curtask //channel 
            <-gt// 
        }(t)
    
    }
}()

for {
    select{
        case rt := <- tkr:
            //
    }
}

actually rarely use map to store task lists, so consider using channel to pass in task


@ timmu Thank you for your answer. I'd like to ask about the role of the gt channel here.


wouldn't it be nice if you started receiving results after initializing one_task? Then this kind of code uses less if else., switch case will be beautiful and perform better, and since it is a loop, it will be better for you to execute the function inside the loop

.
    // allMap 
    // Task 
    type Task struct {
        Params interface{}
        ResultChan chan []byte
        // Wg *sync.WaitGroup
    }
    ParamsResultChanResultChan ;


    //  
    for key, value := range allMap {
        go func(k string, v interface{}) {
            log.Debug("k : " , k )
            switch k {
            case tools.REQUEST_BAOJIE: 
                // A
                log.Debug("baojie elem len : ", len(value))

                one_task = &service.Task{
                    Params:     v,
                    ResultChan: make(chan []byte, len(value)),
                    //Wg : new(sync.WaitGroup) ,
                }
                // 
                go func() {
                    for item := range one_task.ResultChan {
                        log.Debug("Receive data From ResultChan : ", string(item))
                    }
                    log.Debug("Process ", tools.REQUEST_BAOJIE, " end ")
                }()
                // B
                log.Debugf("1 one_task : %+v ", one_task)

                // AddTaskone_taskone_taskResultChan;
                service.AddTask(one_task)

            case tools.REQUEST: 
            
        }(key, value)
    }

    // C
    log.Debugf("2 one_task : %+v ", one_task)

  1. the sample code one_task in your mode 1 and 2 are local variables, and neither C nor the subsequent receive result coprograms can be accessed.
  2. traversing map is much less efficient than slice. It is recommended that allMap be converted into slice-structured storage, in which Task is stored.
  3. the concurrency of golang is the CSP model, with the channel as the core, and where there is an execution order, it is very appropriate to use channel. If C must be followed by An and B, it is recommended to use channel to solve the order problem.
  4. it is the practice of golang to establish a co-process for each task, as long as there is no dependency on the order of execution between them.
  5. there is no need to create a channel, for every task unless you don't know how much data the task is going to generate.

if it were me, I would adopt this mode:

// Task 
type Task struct {
    Key keyType
    Params interface{}
}

var ResultChan chan []byte
var Wg sync.WaitGroup


// 
for _, task :=  range taskSlice {
    Wg.Add(1)
    go task.Do(ResultChan, &Wg)
}

// 
go func() {
    for item := range ResultChan {
        log.Debug("Receive data From ResultChan : ", string(item))
    }
    log.Debug("Process ", tools.REQUEST_BAOJIE, " end ")
}()

// 
Wg.Wait()
close(ResultChan)
Menu