Sunday 15 February 2015

Python profiling with cProfile - Part II: Asynchronous programming with Twisted

Continuing the previous blog on Python profiling, I'm moving to optimisation of concurrent and asynchronous software.

The task for today is: find patterns within a set of web pages
More specifically, given a set {X} of URLs and set {Y} of patterns, retrieve contents for {X}, and match a subset of {Y} for each.

For example, if X is:

{http://www.google.com}

and Y is:

{Google, NoSuchPattern}

then the result is

(http://www.google.com: [Google]}

Since I'd like to show off both asynchronous and parallel optimisation, we'll use the top 4 Alexa domains as the list of URLs:

   http://www.google.com
   http://www.facebook.com
   http://www.youtube.com
   http://www.yahoo.com


As for the list of patterns, I've simply taken the first 4000 lines of /usr/share/dict/words.

Let's start with a straightforward serial attempt, and then start optimising.

(Also, just to put the usual disclaimer: error handling, parameter validation etc. are omitted to save space)


Serial fetch and scan

import sys, urllib

def getMatchingPatterns(patterns, text):
   return filter(text.__contains__, patterns)

def serialScan(urls, patterns):
   return zip(urls, [getMatchingPatterns(patterns, urllib.urlopen(url).read()) for url in urls])

if __name__ == "__main__":
   with open(sys.argv[1]) as urlFile:
      urls = urlFile.read().split()
   with open(sys.argv[2]) as patternsFile:
      patterns = patternsFile.read().split()

   resultTuple = serialScan(urls, patterns)
   for url, patterns in resultTuple:
      print ': '.join([url, ','.join(patterns)])

The algorithm is as simplistic as it gets - just fetch the URLs serially and scan them one by one.
Before we move on, maybe it's just worth briefly zooming into the use of list comprehensions here:
def getMatchingPatterns(patterns, text):
   return filter(text.__contains__, patterns)

def serialScan(urls, patterns):
   return zip(urls, [getMatchingPatterns(patterns, urllib.urlopen(url).read()) for url in urls])


Urllib.urlopen fetches a URL synchronously, we read the content entirely in memory, and then filter the list of patterns with the string.__contains__ function.
This returns a list of patterns included in the URL's content, and we combine that with the URL itself using zip.

Here is the output and timing:

roman@localhost:~/blog/prAv$ time python  serialFetch.py urls.txt words.txt 
http://www.google.com: A,Ag,At,Au,Australia,B,Ba,C,Ca,Chi,Cl,Co,Col,Colo,Cr,Crick,Cu,D
http://www.facebook.com: A,Ac,Ag,Al,Am,Apr,Ar,Art,As,At,Au,Aug,Av,B,Ba,Bi,Bk,Br,Brazil,C,Ca,Cal,Capt,Cd,Che,Chi,China,Chinese,Cl,Co,Com,Cook,Cr,Cs,Cu,D,Day,Dec
http://www.youtube.com: A,Ac,Advent,Ag,Al,Ali,Alice,Alyson,Andersen,Apr,April,Ar,As,At,Atwood,Au,Audi,Aug,August,Av,B,Ba,Ball,Be,Ben,Best,Bi,Bk,Br,Bran,Brit,Britain,Bruce,C,Ca,Cal,Carter,Cf,Che,Chi,Ci,Cl,Co,Col,Collin,Com,Conan,Cr,Creator,Cs,Cu,D,Day,Dec,December
http://www.yahoo.com: A,Abe,Ac,Adidas,Africa,Ag,Al,Am,Amazon,America,American,Ann,Anna,Apple,Aquarius,Ar,Argentina,Aries,Art,Arthur,As,Asia,At,Au,Aug,Australia,Av,B,Ba,Banks,Barney,Bart,Be,Beau,Begin,Belfast,Belgium,Best,Bi,Bill,Bk,Blake,Blu,Bob,Bobby,Boru,Br,Bran,Brazil,Brit,British,Briton,Britons,Brits,Brooke,Brown,Bud,C,Ca,Cal,Camel,Canada,Cancer,Capri,Capricorn,Carney,Case,Cd,Cf,Chan,Chang,Che,Chi,Chile,Chris,Church,Ci,Cl,Cm,Co,Col,Colo,Colombia,Com,Cook,Cr,Cs,Cu,D,Dale,Damian,Day,Dec,December

real    0m7.107s
user    0m3.484s
sys     0m0.040s


There are two parts to timing that are interesting:

(a) We spent about 3.5 seconds in user space. It won't be far fetched to assume that it was the string matching.
(b) About 3.6 seconds was spent retrieving network resources.

Let's first focus on (b), and look at Twisted.


Parallel fetch, serial scan


Twisted provides quite a few constructs: mostly focused around networking, and asynchronous programming, and it's the latter that I'm going to zoom into.

What we'd like to do is fetch all the URLs in parallel, and once they are all retrieved, scan them using the same serial algorithm. This should reduce the 3.6 seconds cost we had in the first, simple attempt.

(Note: Yes, it is more efficient to scan URLs as and when they are retrieved, rather than wait for all of them. We sacrifice a bit of performance for the sake of clarity, but we'll come back to this later on)

The main mechanism for that are Deferreds, which essentially mean: "I do not have a result for you right now, but I'll call your function when I'm ready". In other languages, they are known as futures.

Whatever they are called and whichever language they are used in, they are the cornerstone of asynchronous programming; if we have a resource we can use in parallel, such as network, I/O, or another CPU core, then asynch events is always an option. Here we can do that to fetch multiple URLs at the same time:


from twisted.internet import reactor, defer
from twisted.web import client
import sys

def printResults(resultTuple):
   for url, patterns in resultTuple:
      print ': '.join([url, ','.join(patterns)])

   reactor.stop()

def gotAllPages(results, urls, patterns):
   matchingPatterns = [filter(result[1].__contains__, patterns) for result in results]
   printResults(zip(urls, matchingPatterns))

def parallelFetchAndScan(urls, patterns):
   deferreds = [client.getPage(url) for url in urls]
   defer.DeferredList(deferreds).addCallback(gotAllPages, urls, patterns)

if __name__ == "__main__":
   with open(sys.argv[1]) as urlFile:
      urls = urlFile.read().split()
   with open(sys.argv[2]) as patternsFile:
      patterns = patternsFile.read().split()

   parallelFetchAndScan(urls, patterns)
   reactor.run()

This is quite a jump from the previous version, so let's go over it.


Firstly, the flow of the program is not sequential, but rather event-driven. We install a reactor in line 26, which is responsible for listening for events and triggering callbacks. When we gather all the results, we stop the reactor, which ends the program (line 09).

In line 16, we invoke Twisted.web.client.getPage, which as opposed to urllib.urlopen returns immediately with a Deferred. We could register a callback on this Deferred object, however to keep things simple for a while, we'd like to get notified only when all results become available, hence we combine those into a DeferredList. The end result allows us to get a single event once all asynchronous events have completed.
We register our callback in line 17, using addCallback, and make sure the callback gets the urls and patterns parameters.
The actual pattern matching is the same as before; this part we're yet to optimise.

So, with all of that, did things improve?

roman@localhost:~/blog/prAv$ time python parallelFetch.py urls.txt words.txt 
real    0m4.996s
user    0m3.988s
sys     0m0.068s


We did save 2 seconds, and now we spend most of the time in pattern matching.

We can try taking it down further by scanning page content as and when it comes rather than wait for all pages to be downloaded:

from twisted.internet import reactor, defer
from twisted.web import client
import sys
results = {}

def printResults(ignore_res):
   for url, patterns in results.iteritems():
      print ': '.join([url, ','.join(patterns)])

   reactor.stop()

def gotPage(pageContent, url, patterns):
   matchingPatterns = filter(pageContent.__contains__, patterns)
   results[url] = matchingPatterns

def parallelFetchAndScan(urls, patterns):
   deferreds = []
   for url in urls:
      d = client.getPage(url).addCallback(gotPage, url, patterns)
      deferreds.append(d)

   defer.DeferredList(deferreds).addCallback(printResults)

if __name__ == "__main__":
   with open(sys.argv[1]) as urlFile:
      urls = urlFile.read().split()
   with open(sys.argv[2]) as patternsFile:
      patterns = patternsFile.read().split()

   parallelFetchAndScan(urls, patterns)
   reactor.run()


Here, we use the per-URL gotPage event to extract matching patterns, and use the DeferredList event only to print out the results, and finish the program. I also reluctantly used a global variable for results; in real life of course, I would have encapsulated this within an object.

Did this improve things?
roman@localhost:~/blog/prAv$ time python parallelFetchProgressiveScan.py urls.txt words.txt
real    0m4.689s
user    0m3.920s
sys     0m0.116s


Yes, but not a lot. It is time to look at how we optimise the actual matching.
(By the way, I have removed the results from the actual output, you can take my word, or run the program, to confirm they are consistent)


Parallel fetch, optimised serial scan

Time to run the profiler. This time rather than adding profiler calls within the software itself, I'm using the following command line:

python -m cProfile -o profilerResult.dmp parallelFetchProgressiveScan.py urls.txt words.txt

 

There are two reasons for that:

a) The program is more complicated, and the profiler output is prohibitively long. So, we need to do post-processing of the profiler result.

b) There is no single entry point any longer: we have the reactor in addition to the initial page-fetching calls.

Then, I use the following small Python script to extract the top 10 totTime entries - this would be the functions we spent most time in not counting subroutines (see previous post).

import pstats
import sys
p=pstats.Stats(sys.argv[1])
p.sort_stats('tottime').print_stats(10) 


In general, pstats is quite a handy utility to post-process profiling results. Here, the output is:


roman@localhost:~/blog/prAv$ python topTotTime.py profilerResult.txt
Sun Feb 15 12:33:17 2015    profilerResult.txt

         128752 function calls (126060 primitive calls) in 5.449 seconds

   Ordered by: internal time
   List reduced from 1554 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        5    3.652    0.730    3.652    0.730 {filter}
      429    0.969    0.002    0.969    0.002 {method 'poll' of 'select.epoll' objects}
      724    0.031    0.000    0.031    0.000 {method 'recv' of 'OpenSSL.SSL.Connection' objects}
       87    0.030    0.000    0.030    0.000 {method 'send' of 'OpenSSL.SSL.Connection' objects}
   252/47    0.027    0.000    0.077    0.002 /usr/lib/python2.7/sre_parse.py:379(_parse)
  439/438    0.020    0.000    0.119    0.000 /usr/lib/python2.7/dist-packages/zope/interface/interface.py:222(changed)
     4279    0.017    0.000    0.024    0.000 /usr/lib/python2.7/dist-packages/zope/interface/interface.py:545(__hash__)
    10528    0.016    0.000    0.016    0.000 {method 'append' of 'list' objects}
   530/44    0.016    0.000    0.048    0.001 /usr/lib/python2.7/sre_compile.py:32(_compile)
     8166    0.014    0.000    0.014    0.000 {getattr}



The result is unsurprising - our CPU time is dominated by the filter call in line 13. It was more or less expected, however it's always worth confirming suspicions when optimising.
Of course, the algorithm is inefficient to say the least: our complexity is O (|X| * |Y|), or in simpler terms, we scan each pattern for each URL.
It'd be great to do smarter matching using state machines, where we look for patterns such as Twist and Twisted simultaneously as we iterate through the string. 
And - this is exactly what regular expressions do! So, the plan is straightforward: construct a gigantic regex or-ing all expressions, compile it, and match.

Let's try that:
from twisted.internet import reactor, defer
from twisted.web import client
import sys, re
results = {}

def printResults(ignore_res):
   for url, patterns in results.iteritems():
      print ': '.join([url, ','.join(patterns)])

   reactor.stop()

def gotPage(pageContent, url, pattern_regex):
   matchingPatterns = set()
   for matchObj in pattern_regex.finditer(pageContent):
      matchingPatterns.add(matchObj.group(0))

   results[url] = matchingPatterns

def parallelFetchAndScan(urls, patterns):
   patterns.sort(key = lambda x: len(x), reverse = True)
   pattern_regex = re.compile('|'.join(patterns))

   deferreds = []
   for url in urls:
      d = client.getPage(url).addCallback(gotPage, url, pattern_regex)
      deferreds.append(d)

   defer.DeferredList(deferreds).addCallback(printResults)

if __name__ == "__main__":
   with open(sys.argv[1]) as urlFile:
      urls = urlFile.read().split()
   with open(sys.argv[2]) as patternsFile:
      patterns = patternsFile.read().split()

   parallelFetchAndScan(urls, patterns)
   reactor.run()

A few noteworthy moments are:
  • In line 20 we are sorting patterns in the order of decreasing size. This is because Python OR regex matching is non-greedy, which means that a regex of a|ab will match a in the string abc. We would like to match as long patterns as possible.
  • Line 21 is where the gigantic OR regex is created.
  • During matching, we use a set (line 13). As opposed to the previous algorithm, we can find the same pattern many times, so duplicates need to be eliminated. 
Let's time it:

roman@localhost:~/blog/prAv$ time python parallelFetchWithRegex.py urls.txt words.txt 

real    0m2.241s
user    0m1.244s
sys     0m0.120s


That's better! We went down almost by a factor of three compared to the first, simple, attempt.

Note: We did get that at a cost of a slight functional change. Now, for example, the pattern Col is not matched for Google as we have it is a part of a larger Colorado string. This difference might be justifiable, especially if our goal is to find the most specific pattern.

To recap, we had several successive optimisation stages - both at the architecture level (using asynch-driven programming), and at the low-level (regexes and state machines rather than scanning each sub-string).

In the next post in this series, I'd like to go into further optimisation of this example, by running and profiling the pattern matching on multiple cores.

Mini-acknowledgement: What I find good about these posts is the opportunity to learn while writing. Though I was aware about all of these resources before starting, I had to drill into many other options that did not make it into the post. It did take a few hours start to finish, but it was worth the time.

No comments :

Post a Comment