Implementation of python Asynchronous Cooperative Program

A json file with millions of lines needs to be cleaned and the cleaned structured data needs to be re-stored as a csv file. Try to use pandas"s dataframe to dump cleaned data items, but find that the speed of conventional cleaning is too slow to write one item, and the speed is mainly stuck in each data writing. Therefore, an asyn writeline, is specially defined and uses async readline to generate 100co-routines to process 100rows of data at a time. However, the test results are no different from the conventional sequential processing, and the average processing speed of each piece of data is about 0.5s. I feel that there is something wrong with my asyn writeline, and I ask God to give me some advice.

the test code is as follows:

import pandas as pd
import json
import time
import asyncio

def trop():
    tropicos = pd.DataFrame()
    with open(r"/tropicosbase.json", "r") as yn:
        count = 0
        tropicos["tag"] = None
        tropicos.loc[0] = None
        async def readline(line):
            nonlocal count
            js = json.loads(line)
            await writeline(js, tropicos, count)
            count += 1
            tropicos.loc[count] = None
        cs = yn.readlines()[:100]
        tasks = [asyncio.ensure_future(readline(line)) for line in cs]
        loop = asyncio.get_event_loop()
        start = time.time()
        loop.run_until_complete(asyncio.wait(tasks))
        end = time.time()
        print(end - start)

    tropicos.to_csv(r"/tropicos.csv", index=None)


async def writeline(js, tropicos, count):
    for k, v in js.items():
        try:
            tropicos[k][count] = v
        except KeyError:
            if k == "detailsdiv":
                pass
            else:
                tropicos[k] = pd.Series()
                tropicos[k][count] = v

trop()

there is no advantage in using asyncio to deal with this problem, and it should be used in Icano-intensive operations.

you should use the simplest implementation, and then use cProfile to identify performance bottlenecks.

refer to python.org/3/library/profile.html-sharpmodule-cProfile" rel=" nofollow noreferrer "> https://docs.python.org/3/lib.


in this case, multiprocess should be used instead of multithreading or co-programming.


personal experience.
try to keep the results in memory and reduce the number of file writes.
there is a big difference between writing 10000 lines once and writing 1 line 10000 times.

Menu