The problem of gzip writing files in multiprocessing

problem description

In

multiprocessing, multiple processes are started to write multiple files, but after running, there is * .gz on the hard disk, but the content is empty.

the environmental background of the problems and what methods you have tried

when importing a small gzip file, the output gzip is empty
when exporting a large gzip file, the output gzip is normal

related codes

I want to write the results randomly into 10 gzip files, so my husband becomes a list and saves the file handle.

gzip_files = []
for i in range(10):
    gzip_files.append(gzip.open(str(i) + ".gz","wt"))

pick a suitable read from file and randomly write it to a gzip_file

def choose_read_gz(file, gzip_files, cutoff, read_format):
    with gzip.open(file, "rt") as handle:
        for read in SeqIO.parse(handle, read_format):
            if len(read.seq) > cutoff:
                gzip_files[randint(0, 9)].write(read.format(read_format))

more files, start multiple processes

with Pool(16) as pool:
    for file in files:
        pool.apply_async(choose_read_gz, args=(file, gzip_files, cutoff, read_format, ))
    pool.close()
    pool.join()

Last close the file

for gzip_file in gzip_files:
    gzip_file.close()

what result do you expect? What is the error message actually seen?

  1. when importing a small gzip file, the output gzip content is empty and the size is 27B.
  2. when exporting a large gzip file, the output gzip content is normal.
  3. add flush each time, the output gzip is normal and the content is normal.
def (file, gzip_files, cutoff):
    with gzip.open(file, "rt") as handle:
        for read in SeqIO.parse(handle, read_format):
            if len(read.seq) > cutoff:
                filehandle = gzip_files[randint(0, 9)]
                filehandle.write(read.format(read_format))
                filehandle.flush()
  1. Last flush that controls all handles, the output gzip content is empty
for gzip_file in gzip_files:
    gzip_file.flush()
    gzip_file.close()

Why do you need to force flush every time to write to the hard disk, and why can"t you control the file handle to close and write to the hard disk at the end?


should avoid passing file objects between processes. Instead, use production / consumption mode, and use Queue join.

the following example shows that multiple processes generate data, store it in Queue , and finally read and write to the gzip file by additional processes.

-sharp -*- coding: utf-8 -*-
def produce(q):
    for i in range(10):
        q.put(f'=={i}==\n')


def consume(q):
    import gzip
    with gzip.open('a.gz', 'wt') as f:
        while True:
            data = q.get()
            if not data:
                break
            f.write(data)


def main():
    from multiprocessing import Pool, Manager
    q = Manager().Queue()

    consumePool = Pool(1)
    consumePool.apply_async(consume, args=(q,))

    with Pool(2) as pool:
        pool.apply_async(produce, args=(q,))
        pool.close()
        pool.join()

    q.put('')  -sharp close
    consumePool.close()
    consumePool.join()


if __name__ == "__main__":
    main()
Menu