Skip to content

msrest 0.6.0 Pipeline

Laurent Mazuel edited this page Jul 24, 2018 · 10 revisions

This page describes the new feature "pipeline" introduced with msrest 0.6.0, and its integration with Autorest generated code.

Pipeline

Overview

Pipeline for Python is a mix of a portage of the Pipeline package for Go and the SansIO movement of Python.

Important note: the pipeline implementation is designed to be independent of Autorest and could be externalized as new library. Nothing described in this overview is related to Autorest, see chapter 2 for Autorest integration.

For many years, the Python community mostly used requests as the de-facto HTTP standard library. Which means they was no need to actually separate the implementation choices of requests, from the actual HTTP spec. This becomes problematic with the async world, where requests was not available in async, and then aiohttp was the simplest solution. Two libraries, two different API but for one purpose: doing HTTP calls.

Like the h11 library made a difference between HTTP spec implementation and HTTP actual client implementation but was low level, Pipeline abstract HTTP high level implementation to a common HTTP layer, that does not require to know any deep implementation details. This is a basic example:

from msrest.pipeline import ClientRequest, Pipeline, AsyncPipeline

request = ClientRequest("GET", "http://bing.com")

with Pipeline() as pipeline:
    response = pipeline.run(request)
    assert response.status_code == 200

async with AsyncPipeline() as pipeline:
    response = await pipeline.run(request)
    assert response.status_code == 200

The example:

  • Created an HTTP request. This is pure HTTP, and we don't need to know how this will be implemented
  • Do the request in "sync" approach. By default, the HTTP engine used is requests
  • Do the same request in "async" approach. By default, the HTTP engine used is aiohttp
  • In both cases, the response is a ClientResponse object.

We use the "pipeline" terminology, because we can use a set of policies to change the request before sending it. Instead of doing a massive complex code to build the final request, we can add a policy that will update the request on the fly:

from msrest.pipeline import ClientRequest, Pipeline,
from msrest.pipeline.universal import UserAgentPolicy, HeadersPolicy

request = ClientRequest("GET", "http://bing.com")
policies = [
    UserAgentPolicy("myuseragent"),
    HeadersPolicy({"Authorization": "Bearer complextoken"})
]

with Pipeline(policies=policies) as pipeline:
    response = pipeline.run(request)
    assert response.status_code == 200
    assert response.request.headers["User-Agent"] == "myuseragent"

(same policies variable works as well in AsyncPipeline)

If a policy just modifies or annotate the request based on the HTTP specification, it's then a subclass of SansIOHTTPPolicy and will work in either Pipeline or AsyncPipeline context.

Some policies are more complex, like retries strategy, and need to give the control on the HTTP workflow. In the current version, they are subclasses of HTTPPolicy or AsyncHTTPPolicy, can be used only in one pipeline type. We hope to improve the system in further version to make more and more scenarios available natively in SansIOHTTPPolicy.

Basic models

ClientRequest

A client request contains:

  • A HTTP method (GET, PUT, etc.)
  • An url
  • A headers dict
  • Body (attribute data)
  • An attribute files, which is actually a shortcut to define multipart body.
  • A pipeline_context, which contains the execution context of this query in the current Pipeline.

Pipeline context allows to do keep some information about the current state of the requests. For example, how may retry have been tempted in this request.

ClientResponse

A client response contains

  • A request attribute, the instance of ClientRequest that created the request
  • A status_code, HTTP code
  • A headers dist
  • A reason, which is the Reason phrase returned by server (Not Found, etc).
  • A body method, that return the whole body in bytes.
  • A stream_download method that allows to stream the body, instead of loading it in memory
  • A raise_for_status method, that will raise if status_code is >= 400

HTTPPolicy

HTTPPolicy is a simple abstract class with an abstract method send:

    @abc.abstractmethod
    def send(self, request, **kwargs):
        # type: (ClientRequest, Any) -> ClientResponse
        """Mutate the request..
        """
        return self.next.send(request, **kwargs)
  • next is a read-only attribute that contains a pointer to the next policy or HTTP engine.

AsyncHTTPPolicy

AsyncHTTPPolicy is simply the async version of HTTPPolicy:

    @abc.abstractmethod
    async def send(self, request, **kwargs):
        # type: (ClientRequest, Any) -> ClientResponse
        """Mutate the request..
        """
        return await self.next.send(request, **kwargs)
  • next is a read-only attribute that contains a pointer to the next policy or HTTP engine.

SansIOHTTPPolicy

This is a simple abstract class, that can act before the request is done, or act after. For instance:

  • Settings headers in the request
  • Serializing a complex model into a body as bytes
  • Deserializing a raw bytes response to a complex model
  • Logging the request in and the response out
    def prepare(self, request, **kwargs):
        """Is executed before sending the request to next policy.
        """
        pass

    def post_send(self, request, response, **kwargs):
        """Is executed after the request comes back from the policy.
        """
        pass

HTTPEngine

Generic HTTP configuration

Some of the basic configuration for HTTP client are related mostly to the HTTP spec and can be externalized as a common set of parameters that all HTTP implementation should support. For example, proxies or custom certificate. This is the purpose of the HTTPSenderConfiguration configuration class.

Section Attribute Type Default Value Description
Connection timeout int 100 Timeout of connect or read connection
verify bool True Should I verify the certificate
cert str None Path to a certificate
Proxies proxies dict {} Proxies, keys are protocol
use_env_settings bool True Trust HTTP_PROXY env variable and .netrc
Redirect_Policy allow bool True Allow redirect
max_redirects int 30 Max redirection before failing

Example:

from msrest.pipeline import Pipeline, HTTPSenderConfiguration, get_default_engine

config = HTTPSenderConfiguration()
config.connection.timeout = 10
config.redirect_policy.allow = False

request = ClientRequest("GET", "http://bing.com")
with Pipeline(sender=get_default_engine(config)) as pipeline:
    pipeline.run(request)

(same configuration object can be used for async)

requests based engine

FIXME: describe what parameters are only available for this engine for complex scenario

aiohttp based engine

FIXME: describe what parameters are only available for this engine for complex scenario

Autorest

Autorest is using Pipeline has its HTTP client as of msrest 0.6.0. The default sync Pipeline implementation is as follow:

    def _create_default_pipeline(self):
        # type: () -> Pipeline

        policies = [
            self.config.user_agent_policy,  # UserAgent policy with msrest version
            RequestsPatchSession(),         # Support deprecated operation config at the session level
            self.config.http_logger_policy  # HTTP request/response log
        ]  # type: List[Union[HTTPPolicy, SansIOHTTPPolicy]]
        if self._creds:
            policies.insert(1, RequestsCredentialsPolicy(self._creds))  # Set credentials for requests based session

        return Pipeline(
            policies,
            RequestsHTTPSender(self.config)  # Send HTTP request using requests
        )

With an instance of Autorest client in a client variable, pipeline can be changed anytime using:

client.config.pipeline = Pipeline(**custom_own_parameters)

This implementation is 100% backward compatible with all released SDK in the last two years.