(Truely) Parallel Computing
The grpc library for Python has some strange quirks which make multiprocessing rather fraught. Quoting the official grpc examples repo:
The library is implemented as a C extension, maintaining much of the state that drives the system in native code. As such, upon calling
fork, any threads in a critical section may leave the state of the gRPC library invalid in the child process. See this excellent research paper for a thorough discussion of the topic.Calling
forkwithoutexecin your process is supported before any gRPC servers have been instantiated. Application developers can take advantage of this to parallelize their CPU-intensive operations.
A Fleshed-Out Example
Below is a simple grpc server which accepts requests in a single process and offloads CPU-intensive work to multiple worker processes via a thread-safe queue. It receives completion updates from the worker processes via another queue and makes those available to consumers via a streaming RPC.
server.py
import asyncio
import grpc
from google.protobuf import empty_pb2
from google.protobuf.json_format import MessageToDict
from com.groundsfam.dosomething.v1 import do_something_pb2_grpc
class DoSomethingServiceServicer(do_something_pb2_grpc.DoSomethingServiceServicer):
def __init__(self, work_queue, update_queue):
self.work_queue = work_queue
self.update_queue = update_queue
async def doSomething(self, request, context):
try:
for item in request:
self.work_queue.put(MessageToDict(item))
except Exception:
# handle errors
pass
finally:
return empty_pb2.Empty()
async def getUpdates(self, request, context):
while True:
try:
while not self.update_queue.empty():
update = self.update_queue.get()
yield turn_into_grpc_message_somehow(update)
except Exception:
# handle errors
pass
finally:
# add delay so we don't use the whole CPU while queue is empty
await asyncio.sleep(0.1)
def serve(work_queue, update_queue):
async def run_server():
server = grpc.aio.server()
do_something_pb2_grpc.add_DoSomethingServiceServicer_to_server(
DoSomethingServiceServicer(work_queue, update_queue), server)
server.add_insecure_port(f'[::]:8080')
await server.start()
await server.wait_for_termination()
# above wrapper function makes it easy to ensure we only call this once
# in this process
asyncio.run(run_server())worker.py
import asyncio
from somelib import cpu_intensive_work
def worker(work_queue, update_queue):
async def do_work():
while True:
# blocks this process until we get something to work on
item = work_queue.get()
result = await cpu_intensive_work(item)
update_queue.put(result)
# above wrapper function makes it easy to ensure we only call this once
# in this process
asyncio.run(do_work())main.py
import multiprocessing
from multiprocessing import Queue
from worker import worker
from server import serve
MAX_SIZE = 10000
def main():
work_queue = Queue(maxsize=MAX_SIZE)
update_queue = Queue(maxsize=MAX_SIZE)
process_count = multiprocessing.cpu_count()
processes = []
for i in range(process_count):
p = multiprocessing.Process(
target=serve if i == 0 else worker,
args=(work_queue, update_queue)
)
# marking the process as daemon means it cannot spawn child
# processes, and it will be killed when the parent process ends
p.daemon = True
p.start()
processes.append(p)
# block until all processes have ended
for p in processes:
p.join()
if __name__ == '__main__':
main()