This details how to enable concurrent requests in the Go App Engine SDK.
UPDATES:
Nov 15:
Added that python sdk is currently not threadsafe. This shows how to make GO side threadsafe, and still test concurrency in your application (even though only 1 API request is processed at a time).
Background
The GO App Engine SDK has a pretty elegant design which I wished the Java App Engine SDK had. Full SDK with support for app engine services is supported one time (via Python), and new language runtime (like Go) can be introduced quickly, leveraging that investment (as opposed to duplicating it). Brilliant.
It also simulates what happens in production to an extent, where there’s an App Engine instance that runs your application, but uses RPC (remote procedure calls) to access services provided by App Engine.
In this setup, the Python SDK which supports all the App Engine Runtime services acts as two things:
Getting everything to work is pretty neat.
When a request comes through the Python SDK for the Go App, the following happens:
Currently, the design has some limitations that allow only one request be handled at a time:
Objective:
The objective here is to support concurrent request. This can be done by making the Python SDK a full proxy, with standalone support as an API RPC Server (outside the context of a request).
This will allow more involved testing scenarios:
To summarize, these are the things we hope to achieve:
To achieve this, the following changes are necessary:
This support is got by minor edits to 2 files, a more involved edit to 1 file, and an one-line change in your app.yaml:
I’ve shared a folder containing all of the changed files online here. Feel free to download the changed files and follow through. For all changes, look for the name “ugorji” in a comment in the file before each change.
But Python SDK does NOT support concurrent requests
Yes, Even with these changes, requests to the Python SDK are still inherently single threaded:
Thus, these changes will make the GO side run concurrently. A user can add a back-door http listen port and access the GO instance directly. I do this within an init() or sync.Once.Do(…) surrounded by an if appengine.IsDevAppServer() { … }
http.HandleFunc("/", ...)
http.ListenAndServe(":9999", nil)
Also, within your top-level request handling code, do a check to ensure the header for contexts is set. This is necessary because the Python SDK will add this to the headers proxied to your application. Bypassing the python proxy requires that at a minimum, you set this yourself (before creating any appengine.Context).
if r.Header.Get("X-AppEngine-Inbound-AppId") == "" {
r.Header.Set("X-AppEngine-Inbound-AppId", "dev~app")
}
After that, you can make requests at http://localhost:9999 and get access to your application. Requests through this url can run concurrently. Access to API’s will still be serial (one at a time) but you can still test concurrency in general for your application. This way, only API requests block but the everything else runs concurrently.
When Python SDK becomes thread safe, we only need to make a few changes to be compliant.
Tangent: Testing Support
This is a tangent … more details here
Gotchas
For some reason, excessive logging calls, especially when logging things which don’t fit nicely in ascii, break the dev server. That’s why all the logging calls I added are commented out.
Walkthrough of Changes:
As mentioned earlier, we need to make minor edits to 2 files, a more involved edit to 1 file, and an one-line change in your app.yaml:
I’ve shared a folder containing all of the changed files online here. Feel free to download the changed files and follow through. For all changes, look for the name “ugorji” in a comment in the file before each change.
For each change, we:
google/appengine/tools/dev_appserver_main.py:
# Before call to http_server.serve_forever() top-level try block: # Call go_init(...) to initialize everything if appinfo.runtime == 'go': from google.appengine.ext.go import go_init go_init(port, root_path, allow_skipped_files)
google/appengine/tools/dev_appserver.py:
# Ensure go uses py27 handler # In def ExecuteCGI: # if handler_path and config and config.runtime == 'python27': if handler_path and config and (config.runtime == 'go' or config.runtime == 'python27'): reset_modules = exec_py27_handler(config, handler_path, cgi_path, hook) # To remove vestiges of old CGI model (ie execute_go_cgi) # Remove "if handler_path == '_go_app':" block # if handler_path == '_go_app': # from google.appengine.ext.go import execute_go_cgi # return execute_go_cgi(root_path, handler_path, cgi_path, # env, infile, outfile)
google/appengine/ext/go/init.py:
# Add to end of imports import threading import StringIO # At top, after imports, define variable # It is set later on by the go_init call (based on --allow_skipped_files flag to python sdk) ALLOW_SKIPPED_FILES = False # Define function go_safe_make_and_run # This uses a lock to ensure that make_and_run is not called in parallel _make_and_run_lock=threading.Lock() def go_safe_make_and_run(): global GO_APP, _make_and_run_lock _make_and_run_lock.acquire() try: GO_APP.make_and_run() finally: _make_and_run_lock.release() # At bottom: define new function: go_init(...) # It takes from one-time in execute_go_cgi, and makes it a function called by dev_server.py # Also uses a single daemon thread to asyn manage all API socket communication def go_init(port, root_path, allow_skipped_files): global RAPI_HANDLER, SOCKET_API, SOCKET_HTTP, GAB_WORK_DIR, GO_APP, ALLOW_SKIPPED_FILES if not RAPI_HANDLER: user_port = '%s_%s' % (getpass.getuser(), port) SOCKET_API = SOCKET_API % user_port SOCKET_HTTP = SOCKET_HTTP % user_port GAB_WORK_DIR = gab_work_dir() % user_port cleanup() atexit.register(cleanup) RAPI_HANDLER = handler.ApiCallHandler() GO_APP = GoApp(root_path) ALLOW_SKIPPED_FILES = allow_skipped_files logging.info("Calling Delegate Server Now") ds = DelegateServer() logging.info("Socket server: %s: %s", SOCKET_API, os.stat(SOCKET_API)) def apiConnLoop(): while ds.connected or ds.accepting: asyncore.loop(map=ds._map, count=1) th = threading.Thread(target=asynCoreLoop) th.setDaemon(True) th.start() # To remove vestiges of old CGI model (ie execute_go_cgi) # Completely remove execute_go_cgi method # *** If you don't remove execute_go_cgi method, ... # In execute_go_cgi(root_path, handler_path, cgi_path, env, infile, outfile): # Since setup was moved to a global function, comment out "if not RAPI_HANDLER: "block # global RAPI_HANDLER, GAB_WORK_DIR, SOCKET_HTTP, SOCKET_API, GO_APP # if not RAPI_HANDLER: # user_port = '%s_%s' % (getpass.getuser(), env['SERVER_PORT']) # GAB_WORK_DIR = gab_work_dir() % user_port # SOCKET_HTTP = SOCKET_HTTP % user_port # SOCKET_API = SOCKET_API % user_port # atexit.register(cleanup) # DelegateServer() # RAPI_HANDLER = handler.ApiCallHandler() global HEADER_MAP go_safe_make_and_run() # *** If you don't remove execute_go_cgi method, at least ... # In execute_go_cgi, update call to asyncore.loop(...), to only use current map # This way, it only loops over the socket for its request x = DelegateClient(http_req) while not x.closed: asyncore.loop(map=x._map, count=1) # In RemoteAPIHandler(asyncore.dispatcher_with_send): # Update constructor to take a map, so it can store its request map for use by asyncore.loop class RemoteAPIHandler(asyncore.dispatcher_with_send): def __init__(self, sock, map=None): asyncore.dispatcher_with_send.__init__(self, sock, map=map) # In DelegateServer.handle_accept # Pass the DelegateServer map in to constructor of RemoteAPIHandler # This way, we only have 1 thread that loops over all API Socket communication RemoteAPIHandler(sock, self._map) # *** Do this only if python sdk is still not threadsafe *** # In DelegateServer.__init__, it should only listen to 1 connection max # This way, even with multiple connections, only 1 is served at a time self.listen(1) # In find_app_files(basedir), add check for ALLOW_SKIPPED_FILES: # This way, skip_files can be used in dev environment (if -allow_skipped_files flag given) # This is necessary, to allow for testing, and using artifacts which should not make it to production if not ALLOW_SKIPPED_FILES and APP_CONFIG.skip_files.match(ename): continue # support wsgi for python2.7, by defining your WSGI application as a single callable function. # Note that it doesn't use webob. Just plain WSGI protocol. def go_wsgi(environ, start_response): global HEADER_MAP go_safe_make_and_run() request_method = environ['REQUEST_METHOD'] server_protocol = environ['SERVER_PROTOCOL'] url = environ.get('SCRIPT_NAME','') url += environ.get('PATH_INFO','') if environ.get('QUERY_STRING'): url += '?' + environ['QUERY_STRING'] body = '' length = 0 if environ.get('CONTENT_LENGTH'): length = int(environ['CONTENT_LENGTH']) body = environ['wsgi.input'].read(length) headers = {} for k, v in environ.items(): if k in HEADER_MAP: headers[HEADER_MAP[k]] = v elif k.startswith('HTTP_'): hk = k[5:].replace("_", "-") if hk.title() == 'Connection': continue headers[hk] = v headers['Content-Length'] = str(length) headers['Connection'] = 'close' #logging.info("headers: %s", headers) hrl = [] hrl.append(request_method + ' ' + url + ' ' + server_protocol) for k, v in headers.items(): hrl.append(k + ': ' + v) hrl.append("") hrl.append(body) http_req = '\r\n'.join(hrl) x = DelegateClient(http_req) #logging.info("x.closed: %s, map: %s", x.closed, x._map) while not x.closed: asyncore.loop(map=x._map, count=1) res = x.result if res.startswith('HTTP/1.0 ') or res.startswith('HTTP/1.1 '): fp = StringIO.StringIO(res) headerlist = [] line1 = fp.readline().strip() while True: line = fp.readline().strip() if not line: break header_name, value = line.split(':', 1) headerlist.append((header_name, value.strip())) body = fp.read() start_response(line1[9:], headerlist) return [body] else: start_response("500 Internal Server Error", [('Content-Type', 'text/plain')]) return ['Internal Server Error'] # Define top-level variable in the module that can be referred in app.yaml # This one must handle exceptions and populate the result appropriately, so def GO_WSGI_APP(environ, start_response): x = None try: x = go_wsgi(environ, start_response) except dev_appserver.CompileError, ex: start_response("500 Internal Server Error", [('Content-Type', 'text/plain')]) x = [ex.text] except Exception, ex: start_response("500 Internal Server Error", [('Content-Type', 'text/plain')]) x = [str(ex)] return x
app.yaml
# use the WSGI app as your script, instead of _go_app - url: /.* script: google.appengine.ext.go.GO_WSGI_APP # script: _go_app