How do I offload the acquired data within the python code?

for example, when I use kafka or rabbitmq to get data from a message queue, the source has only one queue and cannot offload data from the data source. However, it is difficult to calculate, so it is necessary to divert it internally. For example, my message processing class is:

class Worker(threading.Thread):
    def __init__(self):
        self.raw = []
    
    def run(self):
        while True:
            if self.raw:
                d = self.raw.pop()
                d
                
    

then while the program is running, I create several Worker, and then store the data passed in from the message source into the raw attribute of these Worker. The question is, how can the data be diverted at a relatively low cost?
is the idea of multithreading correct? I have tested it before and feel that the efficiency of the Queue that comes with python is not very high.
in addition, if I want to design how the flexible creation of Worker, should be done in python code, that is, when the data flow is large, I will append to create some Worker, and destroy some Worker when the data flow decreases.


finally, by reading some documents, it is found that if intensive operations are involved, it is not wise to choose python in the first place. The design of Queue also does not take into account a large amount of traffic processing. If there is no answer to the question, it is most likely that the question itself is wrong.


for cpython, operations that consume CPU resources should use multiple processes instead of multithreading.

in this case, there are two places to do "shunting", one is, as you mentioned, to allocate after receiving the data, and the other is to make use of the message framework itself.
take RabbitMQ as an example. You can refer to this tutorial Work Queues, python.html" rel=" nofollow noreferrer "> https://www.rabbitmq.com/tuto.

.

other things such as "finding ways to save money", "flexible work pool", etc., might as well put down the function first, and then optimize for the bottleneck, so as to get twice the result with half the effort.

on optimizing python performance, there is an article that you can refer to https://pypy.org/performance.

.

if you decide that it is computationally intensive, it is really not suitable to use multithreading in python, but you can consider using multiprocess. You don't need to create your own queue for internal diversion, even if you need a Queue, you need to limit the traffic of the Queue by setting the size of the Queue.

take rabbitmq as an example, see python.html" rel=" nofollow noreferrer "> https://www.rabbitmq.com/tuto.

in the official example of rabbitmq, pika is used as the client of rabbitmq. The message model should be consistent with yours. Modify the official work.py example slightly to consume messages by setting up multiple rabbitmq clients:

-sharp!/usr/bin/env python
import pika
import time
from concurrent.futures import ProcessPoolExecutor
-sharp from concurrent.futures import ThreadPoolExecutor


connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channels = [
    (1, connection.channel(1)),
    (2, connection.channel(2)),
    (3, connection.channel(3)),
]

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(0.2)
    print(" [x] Done: %s" % ch.channel_number)
    ch.basic_ack(delivery_tag=method.delivery_tag)


for _, channel in channels:
    channel.queue_declare(queue='task_queue', durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback, queue='task_queue')


def start_consumer(channel_name):
    dict(channels).get(channel_name).start_consuming()


print(' [*] Waiting for messages. To exit press CTRL+C')

with ProcessPoolExecutor(max_workers=len(channels)) as e:
    e.map(start_consumer, range(1, 4))

-sharp with ThreadPoolExecutor(max_workers=len(channels)) as e:
-sharp     e.map(start_consumer, range(1, 4))

flexible creation of worker I think it is difficult to implement from inside the program (worker.py), but it is easier to implement from outside the program. First, monitor the traffic. If the traffic increases, you can accelerate the consumption of messages by starting more worker.py scripts; on the contrary, reduce the number of worker.py launches.

Menu