Actors demo

From foyono
Jump to navigationJump to search
>>> from concurrent.futures import ThreadPoolExecutor
>>> from splut.actor import Join, Spawn
>>> from urllib.request import urlopen
>>> import sys
>>>
>>> class Pipe:
...
...     traffic = 0
...
...     def __init__(self, index):
...         self.index = index
...
...     def fetch(self, url):
...         def log(message):
...             print(f"[{self.index}] [{url}] {message}", file = sys.stderr) # Demonstrate worker (and thus thread) utilisation.
...         log('Fetch.')
...         with urlopen(url) as f:
...             data = f.read()
...         n = len(data)
...         log(f"Got: {n}")
...         self.traffic += n # Effectively self is locked, so this is thread-safe.
...         return data
...
>>> class Sum:
...
...     def __init__(self, pipeactor):
...         self.pipeactor = pipeactor
...
...     async def bytecount(self, dotcoms):
...         futures = [self.pipeactor.fetch(f"https://www.{dotcom}.com/") for dotcom in dotcoms] # Collect futures eagerly.
...         return sum(map(len, await Join(futures)))
...
>>> pipes = [Pipe(i) for i in range(3)]
>>> with ThreadPoolExecutor() as e:
...     spawn = Spawn(e)
...     pipeactor = spawn(*pipes) # One mailbox, multiple independent workers.
...     sumactor = spawn(Sum(pipeactor)) # An actor is cheap to create, unlike a thread.
...     bytecount = sumactor.bytecount(['facebook', 'github', 'google', 'tumblr', 'youtube']).wait()

Total bytes piped by workers matches the total we have:
>>> sum(p.traffic for p in pipes) == bytecount
True