Actors demo
From foyono
Jump to navigationJump to search
>>> from concurrent.futures import ThreadPoolExecutor
>>> from splut.actor import Join, Spawn
>>> from urllib.request import urlopen
>>> import sys
>>>
>>> class Pipe:
...
... traffic = 0
...
... def __init__(self, index):
... self.index = index
...
... def fetch(self, url):
... def log(message):
... print(f"[{self.index}] [{url}] {message}", file = sys.stderr) # Demonstrate worker (and thus thread) utilisation.
... log('Fetch.')
... with urlopen(url) as f:
... data = f.read()
... n = len(data)
... log(f"Got: {n}")
... self.traffic += n # Effectively self is locked, so this is thread-safe.
... return data
...
>>> class Sum:
...
... def __init__(self, pipeactor):
... self.pipeactor = pipeactor
...
... async def bytecount(self, dotcoms):
... futures = [self.pipeactor.fetch(f"https://www.{dotcom}.com/") for dotcom in dotcoms] # Collect futures eagerly.
... return sum(map(len, await Join(futures)))
...
>>> pipes = [Pipe(i) for i in range(3)]
>>> with ThreadPoolExecutor() as e:
... spawn = Spawn(e)
... pipeactor = spawn(*pipes) # One mailbox, multiple independent workers.
... sumactor = spawn(Sum(pipeactor)) # An actor is cheap to create, unlike a thread.
... bytecount = sumactor.bytecount(['facebook', 'github', 'google', 'tumblr', 'youtube']).wait()
Total bytes piped by workers matches the total we have:
>>> sum(p.traffic for p in pipes) == bytecount
True