OpenStack Swift作为开源的云存储工具,被越来越多的公司使用。为了记录和巩固学习swift的开源源码,所以进行一系列的源码开源学习笔记,供初学者快速学习和理解swift的内部功能。
proxy下面的server.py模块是所有对account,container,object等对象进行管理操作的在swift的proxy端的总入口。在swift系统在接收到url请求后,先是经过middleware处理链分别验证处理后,再进入到proxy下面的server模块,进入 _call_方法调用后,把对应的请求分发给不同的controller处理,controller再调用各自的nodeStroage服务程序进行处理,返回各自的resp结果到server模块,最后通过mimmdleware处理链再反方向返回最终的请求处理结果
1.首先server.py的整个结构如下图:包括4部分:一堆import引用,一个required_filters的字典,一个名为“Application(object)”的class ,一个app_factory(global_conf, **local_conf) 方法
2.主要介绍“Application(object)”的class,其中包含了所有主要的功能方法
2.1 _init_ 方法,Application类的初始化方法,主要就是初始化一些对象,包括:conf配置文件参数 的初始化,log日志初始化,memcache对象初始化,account_ring,container_ring, object_ring对象初始化等
2.2 check_config(self) 方法,主要是检查配置文件proxy-server.conf中配置的“read_affinity” 和“sorting_method”属性值是否正确,该方法在 app_factory(global_conf, **local_conf):方法时调用
2.3 get_controller(self, path)方法,主要是根据传入的 urlPath解析并返回对应的control类和一个字典对象,其中字典对象的值根据传入url格式的不同返回不同的值
- def get_controller(self, path):
- """
- Get the controller to handle a request.
- :param path: path from request
- :returns: tuple of (controller class, path dictionary)
- :raises: ValueError (thrown by split_path) if given invalid path
- """
- if path == '/info': #url是/info 则返回InfoController和包括version,expose_info,disallowed_sections,admin_key的字典
- d = dict(version=None,
- expose_info=self.expose_info,
- disallowed_sections=self.disallowed_sections,
- admin_key=self.admin_key)
- return InfoController, d
- version, account, container, obj = split_path(path, 1, 4, True) #以/拆分url为数列,并取对应的1到4位的数据返回给对应的变量
- d = dict(version=version,
- account_name=account,
- container_name=container,
- object_name=obj)
- if obj and container and account: #根据解析出的account值,congtainer和object值得有无,确定适用的Controller是那种
- return ObjectController, d
- elif container and account:
- return ContainerController, d
- elif account and not container and not obj:
- return AccountController, d
- return None, d
2.4 __call__(self, env, start_response)方法,是server模块的实际对account、container、object等对象调用处理的功能入口。
- def __call__(self, env, start_response):
- """
- WSGI entry point.
- Wraps env in swob.Request object and passes it down.
- :param env: WSGI environment dictionary
- :param start_response: WSGI callable
- """
- try:
- if self.memcache is None: #首先判断是否memcache值存在,不存在再去获取一次
- self.memcache = cache_from_env(env)
- req = self.update_request(Request(env)) #判断header中是否有x-storage-token和x-auth-token
- return self.handle_request(req)(env, start_response) #调用handle_request方法并返回处理的结果resp对象
- except UnicodeError:
- err = HTTPPreconditionFailed(
- request=req, body='Invalid UTF8 or contains NULL')
- return err(env, start_response)
- except (Exception, Timeout):
- start_response('500 Server Error',
- [('Content-Type', 'text/plain')])
- return ['Internal server error.\n']
2.5 update_request(self, req)方法,根据请求中header里面的x-storage-token有而x-auth-token没有的情况,把x-storage-token的值赋予x-auth-token
2.6 handle_request(self, req)方法,server模块实际处理request请求的方法,熟悉servlet的同学可以把它理解成servlet的作用
- def handle_request(self, req):
- """
- Entry point for proxy server.
- Should return a WSGI-style callable (such as swob.Response).
- :param req: swob.Request object
- """
- try:
- self.logger.set_statsd_prefix('proxy-server') #在日志的开头加上‘proxy-server’,方便跟踪分析
- if req.content_length and req.content_length < 0: #检查header里面中的Content-Length是否有值,无值返回错误请求,并日志记录
- self.logger.increment('errors')
- return HTTPBadRequest(request=req,
- body='Invalid Content-Length')
- try:
- if not check_utf8(req.path_info): #检查Pathde的编码是否不满足utf8,不满足返回错误请求,并日志记录
- self.logger.increment('errors')
- return HTTPPreconditionFailed(
- request=req, body='Invalid UTF8 or contains NULL')
- except UnicodeError:
- self.logger.increment('errors')
- return HTTPPreconditionFailed(
- request=req, body='Invalid UTF8 or contains NULL')
- try:
- controller, path_parts = self.get_controller(req.path) #调用get_controller(self,path)方法返回正确的controller类和字典对象
- p = req.path_info
- if isinstance(p, unicode):
- p = p.encode('utf-8') #path编码Unicode转换utf-8
- except ValueError: #发生值异常,返回错误请求,并日志记录
- self.logger.increment('errors')
- return HTTPNotFound(request=req)
- if not controller: #为找到对应处理的controller类时,返回错误请求,并日志记录
- self.logger.increment('errors')
- return HTTPPreconditionFailed(request=req, body='Bad URL')
- if self.deny_host_headers and \ #当proxy-server.conf中deny_host_headers有值,且请求的header中的host在deny_host_headers中,则返回错误请求,并日志记录
- req.host.split(':')[0] in self.deny_host_headers:
- return HTTPForbidden(request=req, body='Invalid host header')
- self.logger.set_statsd_prefix('proxy-server.' +
- controller.server_type.lower()) #在日志的开头加上‘proxy-server.controller类中的请求类型(eg:HEAD/GET/PUT)’,方便跟踪分析
- controller = controller(self, **path_parts) #初始化实际的controller对象(AccountController、ContainerController、ObjectController、InfoController其中之一)
- if 'swift.trans_id' not in req.environ: #如果没有trans_id在env中,则重新生成一个,有些类似于http请求中的seesionID的感觉,是一种UUID
- # if this wasn't set by an earlier middleware, set it now
- trans_id = generate_trans_id(self.trans_id_suffix)
- req.environ['swift.trans_id'] = trans_id
- self.logger.txn_id = trans_id
- req.headers['x-trans-id'] = req.environ['swift.trans_id']
- controller.trans_id = req.environ['swift.trans_id']
- self.logger.client_ip = get_remote_client(req) #把请求中获取出请求端的IP信息,加入logger对象,方便后续日志查看分析
- try:
- handler = getattr(controller, req.method) #根据req.method方法获取对应controller对象中的方法(可能是多个,有的有public标签,有的没有)
- getattr(handler, 'publicly_accessible') #再根据public标签获取最终的处理方法。(在方法前面可以加 @public 和@delay_denial)
- except AttributeError:
- allowed_methods = getattr(controller, 'allowed_methods', set())
- return HTTPMethodNotAllowed(
- request=req, headers={'Allow': ', '.join(allowed_methods)})
- if 'swift.authorize' in req.environ: #做鉴权操作
- # We call authorize before the handler, always. If authorized,
- # we remove the swift.authorize hook so isn't ever called
- # again. If not authorized, we return the denial unless the
- # controller's method indicates it'd like to gather more
- # information and try again later.
- resp = req.environ['swift.authorize'](req)
- if not resp:
- # No resp means authorized, no delayed recheck required.
- del req.environ['swift.authorize']
- else:
- # Response indicates denial, but we might delay the denial
- # and recheck later. If not delayed, return the error now.
- if not getattr(handler, 'delay_denial', None):
- return resp
- # Save off original request method (GET, POST, etc.) in case it
- # gets mutated during handling. This way logging can display the
- # method the client actually sent.
- req.environ['swift.orig_req_method'] = req.method
- return handler(req) #调用最终的method方法,并返回resp结果
- except HTTPException as error_response:
- return error_response
- except (Exception, Timeout):
- self.logger.exception(_('ERROR Unhandled exception in request'))
- return HTTPServerError(request=req)
2.7 sort_nodes(self, nodes)方法,对nodes对象进行排序处理,该方法在iter_nodes(self, ring, partition, node_iter=None)中调用
- def sort_nodes(self, nodes):
- '''''
- Sorts nodes in-place (and returns the sorted list) according to
- the configured strategy. The default "sorting" is to randomly
- shuffle the nodes. If the "timing" strategy is chosen, the nodes
- are sorted according to the stored timing data.
- '''
- # In the case of timing sorting, shuffling ensures that close timings
- # (ie within the rounding resolution) won't prefer one over another.
- # Python's sort is stable (http://wiki.python.org/moin/HowTo/Sorting/)
- shuffle(nodes)
- if self.sorting_method == 'timing': #配置文件中排序方法为timing时,以时间排序
- now = time()
- def key_func(node):
- timing, expires = self.node_timings.get(node['ip'], (-1.0, 0))
- return timing if expires > now else -1.0
- nodes.sort(key=key_func)
- elif self.sorting_method == 'affinity': #配置文件中排序方法为affinity时,以自定义的亲和力规则排序
- nodes.sort(key=self.read_affinity_sort_key)
- return nodes
2.8 set_node_timing(self, node, timing)方法,提供给外部程序调用
2.9 error_limited(self, node)方法,该方法在iter_nodes(self, ring, partition, node_iter=None)中调用
- def error_limited(self, node):
- """
- Check if the node is currently error limited.
- :param node: dictionary of node to check
- :returns: True if error limited, False otherwise
- """
- now = time()
- if 'errors' not in node: #errors没在node中时返回false
- return False
- if 'last_error' in node and node['last_error'] < \ #last_error在node中有并且 last_error小于现在时间减去系统允许的时间间隔
- now - self.error_suppression_interval:
- del node['last_error'] #node去掉last_error
- if 'errors' in node: #errors在node中时返回 去掉errors,且返回false
- del node['errors']
- return False
- limited = node['errors'] > self.error_suppression_limit #errors在node中中的个数多与系统允许的个数,是返回true,且做日志记录
- if limited:
- self.logger.debug(
- _('Node error limited %(ip)s:%(port)s (%(device)s)'), node)
- return limited
2.10 error_limit(self, node, msg)方法,提供给外部程序调用,用于给node直接增加errors到比系统允许的次数+1,并记录last_error时间,和做日志记录
2.11 error_limit(self, node, msg)方法,提供给外部程序调用,用于给node增加errors次数,并记录last_error时间,和做日志记录
2.12 iter_nodes(self, ring, partition, node_iter=None)方法,提供给外部程序调用,用于对nodes做排序后生成的nodes迭代器
2.13 exception_occurred(self, node, typ, additional_info)方法,提供给外部程序调用,用于当node发生异常了,进行日志记录
2.14 modify_wsgi_pipeline(self, pipe)方法,提供给外部程序调用,用于系统启动时,初始化pipeline,并做日志记录
- def modify_wsgi_pipeline(self, pipe):
- """
- Called during WSGI pipeline creation. Modifies the WSGI pipeline
- context to ensure that mandatory middleware is present in the pipeline.
- :param pipe: A PipelineWrapper object
- """
- pipeline_was_modified = False
- for filter_spec in reversed(required_filters): #当required_filters字典中定义了需要重新排序的app时,进行pipeline的重新排序处理
- filter_name = filter_spec['name']
- if filter_name not in pipe:
- afters = filter_spec.get('after_fn', lambda _junk: [])(pipe)
- insert_at = 0
- for after in afters:
- try:
- insert_at = max(insert_at, pipe.index(after) + 1)
- except ValueError: # not in pipeline; ignore it
- pass
- self.logger.info(
- 'Adding required filter %s to pipeline at position %d' %
- (filter_name, insert_at))
- ctx = pipe.create_filter(filter_name)
- pipe.insert_filter(ctx, index=insert_at)
- pipeline_was_modified = True
- if pipeline_was_modified:
- self.logger.info("Pipeline was modified. New pipeline is \"%s\".",
- pipe)
- else:
- self.logger.debug("Pipeline is \"%s\"", pipe)
以下源码为2014、3、12的最新的Proxy的server.py源码,只加了部分代码注释:
- # Copyright (c) 2010-2012 OpenStack Foundation
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- # implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import mimetypes
- import os
- import socket
- from swift import gettext_ as _
- from random import shuffle
- from time import time
- import itertools
- from eventlet import Timeout
- from swift import __canonical_version__ as swift_version
- from swift.common import constraints
- from swift.common.ring import Ring
- from swift.common.utils import cache_from_env, get_logger, \
- get_remote_client, split_path, config_true_value, generate_trans_id, \
- affinity_key_function, affinity_locality_predicate, list_from_csv, \
- register_swift_info
- from swift.common.constraints import check_utf8
- from swift.proxy.controllers import AccountController, ObjectController, \
- ContainerController, InfoController
- from swift.common.swob import HTTPBadRequest, HTTPForbidden, \
- HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \
- HTTPServerError, HTTPException, Request
- # List of entry points for mandatory middlewares.
- #
- # Fields:
- #
- # "name" (required) is the entry point name from setup.py.
- #
- # "after_fn" (optional) a function that takes a PipelineWrapper object as its
- # single argument and returns a list of middlewares that this middleware
- # should come after. Any middlewares in the returned list that are not present
- # in the pipeline will be ignored, so you can safely name optional middlewares
- # to come after. For example, ["catch_errors", "bulk"] would install this
- # middleware after catch_errors and bulk if both were present, but if bulk
- # were absent, would just install it after catch_errors.
- required_filters = [
- {'name': 'catch_errors'},
- {'name': 'gatekeeper',
- 'after_fn': lambda pipe: (['catch_errors']
- if pipe.startswith("catch_errors")
- else [])},
- {'name': 'dlo', 'after_fn': lambda _junk: ['catch_errors', 'gatekeeper',
- 'proxy_logging']}]
- class Application(object):
- """WSGI application for the proxy server."""
- def __init__(self, conf, memcache=None, logger=None, account_ring=None,
- container_ring=None, object_ring=None):
- if conf is None:
- conf = {}
- if logger is None:
- self.logger = get_logger(conf, log_route='proxy-server')
- else:
- self.logger = logger
- swift_dir = conf.get('swift_dir', '/etc/swift')
- self.node_timeout = int(conf.get('node_timeout', 10))
- self.recoverable_node_timeout = int(
- conf.get('recoverable_node_timeout', self.node_timeout))
- self.conn_timeout = float(conf.get('conn_timeout', 0.5))
- self.client_timeout = int(conf.get('client_timeout', 60))
- self.put_queue_depth = int(conf.get('put_queue_depth', 10))
- self.object_chunk_size = int(conf.get('object_chunk_size', 65536))
- self.client_chunk_size = int(conf.get('client_chunk_size', 65536))
- self.trans_id_suffix = conf.get('trans_id_suffix', '')
- self.post_quorum_timeout = float(conf.get('post_quorum_timeout', 0.5))
- self.error_suppression_interval = \
- int(conf.get('error_suppression_interval', 60))
- self.error_suppression_limit = \
- int(conf.get('error_suppression_limit', 10))
- self.recheck_container_existence = \
- int(conf.get('recheck_container_existence', 60))
- self.recheck_account_existence = \
- int(conf.get('recheck_account_existence', 60))
- self.allow_account_management = \
- config_true_value(conf.get('allow_account_management', 'no'))
- self.object_post_as_copy = \
- config_true_value(conf.get('object_post_as_copy', 'true'))
- self.object_ring = object_ring or Ring(swift_dir, ring_name='object')
- self.container_ring = container_ring or Ring(swift_dir,
- ring_name='container')
- self.account_ring = account_ring or Ring(swift_dir,
- ring_name='account')
- self.memcache = memcache
- mimetypes.init(mimetypes.knownfiles +
- [os.path.join(swift_dir, 'mime.types')])
- self.account_autocreate = \
- config_true_value(conf.get('account_autocreate', 'no'))
- self.expiring_objects_account = \
- (conf.get('auto_create_account_prefix') or '.') + \
- (conf.get('expiring_objects_account_name') or 'expiring_objects')
- self.expiring_objects_container_divisor = \
- int(conf.get('expiring_objects_container_divisor') or 86400)
- self.max_containers_per_account = \
- int(conf.get('max_containers_per_account') or 0)
- self.max_containers_whitelist = [
- a.strip()
- for a in conf.get('max_containers_whitelist', '').split(',')
- if a.strip()]
- self.deny_host_headers = [
- host.strip() for host in
- conf.get('deny_host_headers', '').split(',') if host.strip()]
- self.rate_limit_after_segment = \
- int(conf.get('rate_limit_after_segment', 10))
- self.rate_limit_segments_per_sec = \
- int(conf.get('rate_limit_segments_per_sec', 1))
- self.log_handoffs = config_true_value(conf.get('log_handoffs', 'true'))
- self.cors_allow_origin = [
- a.strip()
- for a in conf.get('cors_allow_origin', '').split(',')
- if a.strip()]
- self.strict_cors_mode = config_true_value(
- conf.get('strict_cors_mode', 't'))
- self.node_timings = {}
- self.timing_expiry = int(conf.get('timing_expiry', 300))
- self.sorting_method = conf.get('sorting_method', 'shuffle').lower()
- self.max_large_object_get_time = float(
- conf.get('max_large_object_get_time', '86400'))
- value = conf.get('request_node_count', '2 * replicas').lower().split()
- if len(value) == 1:
- value = int(value[0])
- self.request_node_count = lambda replicas: value
- elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas':
- value = int(value[0])
- self.request_node_count = lambda replicas: value * replicas
- else:
- raise ValueError(
- 'Invalid request_node_count value: %r' % ''.join(value))
- try:
- self._read_affinity = read_affinity = conf.get('read_affinity', '')
- self.read_affinity_sort_key = affinity_key_function(read_affinity)
- except ValueError as err:
- # make the message a little more useful
- raise ValueError("Invalid read_affinity value: %r (%s)" %
- (read_affinity, err.message))
- try:
- write_affinity = conf.get('write_affinity', '')
- self.write_affinity_is_local_fn \
- = affinity_locality_predicate(write_affinity)
- except ValueError as err:
- # make the message a little more useful
- raise ValueError("Invalid write_affinity value: %r (%s)" %
- (write_affinity, err.message))
- value = conf.get('write_affinity_node_count',
- '2 * replicas').lower().split()
- if len(value) == 1:
- value = int(value[0])
- self.write_affinity_node_count = lambda replicas: value
- elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas':
- value = int(value[0])
- self.write_affinity_node_count = lambda replicas: value * replicas
- else:
- raise ValueError(
- 'Invalid write_affinity_node_count value: %r' % ''.join(value))
- # swift_owner_headers are stripped by the account and container
- # controllers; we should extend header stripping to object controller
- # when a privileged object header is implemented.
- swift_owner_headers = conf.get(
- 'swift_owner_headers',
- 'x-container-read, x-container-write, '
- 'x-container-sync-key, x-container-sync-to, '
- 'x-account-meta-temp-url-key, x-account-meta-temp-url-key-2, '
- 'x-account-access-control')
- self.swift_owner_headers = [
- name.strip().title()
- for name in swift_owner_headers.split(',') if name.strip()]
- # Initialization was successful, so now apply the client chunk size
- # parameter as the default read / write buffer size for the network
- # sockets.
- #
- # NOTE WELL: This is a class setting, so until we get set this on a
- # per-connection basis, this affects reading and writing on ALL
- # sockets, those between the proxy servers and external clients, and
- # those between the proxy servers and the other internal servers.
- #
- # ** Because it affects the client as well, currently, we use the
- # client chunk size as the govenor and not the object chunk size.
- socket._fileobject.default_bufsize = self.client_chunk_size
- self.expose_info = config_true_value(
- conf.get('expose_info', 'yes'))
- self.disallowed_sections = list_from_csv(
- conf.get('disallowed_sections'))
- self.admin_key = conf.get('admin_key', None)
- register_swift_info(
- version=swift_version,
- strict_cors_mode=self.strict_cors_mode,
- **constraints.EFFECTIVE_CONSTRAINTS)
- def check_config(self):
- """
- Check the configuration for possible errors
- """
- if self._read_affinity and self.sorting_method != 'affinity':
- self.logger.warn("sorting_method is set to '%s', not 'affinity'; "
- "read_affinity setting will have no effect." %
- self.sorting_method)
- def get_controller(self, path):
- """
- Get the controller to handle a request.
- :param path: path from request
- :returns: tuple of (controller class, path dictionary)
- :raises: ValueError (thrown by split_path) if given invalid path
- """
- if path == '/info': #url是/info 则返回InfoController和包括version,expose_info,disallowed_sections,admin_key的字典
- d = dict(version=None,
- expose_info=self.expose_info,
- disallowed_sections=self.disallowed_sections,
- admin_key=self.admin_key)
- return InfoController, d
- version, account, container, obj = split_path(path, 1, 4, True) #以/拆分url为数列,并取对应的1到4位的数据返回给对应的变量
- d = dict(version=version,
- account_name=account,
- container_name=container,
- object_name=obj)
- if obj and container and account: #根据解析出的account值,congtainer和object值得有无,确定适用的Controller是那种
- return ObjectController, d
- elif container and account:
- return ContainerController, d
- elif account and not container and not obj:
- return AccountController, d
- return None, d
- def __call__(self, env, start_response):
- """
- WSGI entry point.
- Wraps env in swob.Request object and passes it down.
- :param env: WSGI environment dictionary
- :param start_response: WSGI callable
- """
- try:
- if self.memcache is None: #首先判断是否memcache值存在,不存在再去获取一次
- self.memcache = cache_from_env(env)
- req = self.update_request(Request(env)) #判断header中是否有x-storage-token和x-auth-token
- return self.handle_request(req)(env, start_response) #调用handle_request方法并返回处理的结果resp对象
- except UnicodeError:
- err = HTTPPreconditionFailed(
- request=req, body='Invalid UTF8 or contains NULL')
- return err(env, start_response)
- except (Exception, Timeout):
- start_response('500 Server Error',
- [('Content-Type', 'text/plain')])
- return ['Internal server error.\n']
- def update_request(self, req):
- if 'x-storage-token' in req.headers and \
- 'x-auth-token' not in req.headers:
- req.headers['x-auth-token'] = req.headers['x-storage-token']
- return req
- def handle_request(self, req):
- """
- Entry point for proxy server.
- Should return a WSGI-style callable (such as swob.Response).
- :param req: swob.Request object
- """
- try:
- self.logger.set_statsd_prefix('proxy-server') #在日志的开头加上‘proxy-server’,方便跟踪分析
- if req.content_length and req.content_length < 0: #检查header里面中的Content-Length是否有值,无值返回错误请求,并日志记录
- self.logger.increment('errors')
- return HTTPBadRequest(request=req,
- body='Invalid Content-Length')
- try:
- if not check_utf8(req.path_info): #检查Pathde的编码是否不满足utf8,不满足返回错误请求,并日志记录
- self.logger.increment('errors')
- return HTTPPreconditionFailed(
- request=req, body='Invalid UTF8 or contains NULL')
- except UnicodeError:
- self.logger.increment('errors')
- return HTTPPreconditionFailed(
- request=req, body='Invalid UTF8 or contains NULL')
- try:
- controller, path_parts = self.get_controller(req.path) #调用get_controller(self,path)方法返回正确的controller类和字典对象
- p = req.path_info
- if isinstance(p, unicode):
- p = p.encode('utf-8') #path编码Unicode转换utf-8
- except ValueError: #发生值异常,返回错误请求,并日志记录
- self.logger.increment('errors')
- return HTTPNotFound(request=req)
- if not controller: #为找到对应处理的controller类时,返回错误请求,并日志记录
- self.logger.increment('errors')
- return HTTPPreconditionFailed(request=req, body='Bad URL')
- if self.deny_host_headers and \ #当proxy-server.conf中deny_host_headers有值,且请求的header中的host在deny_host_headers中,则返回错误请求,并日志记录
- req.host.split(':')[0] in self.deny_host_headers:
- return HTTPForbidden(request=req, body='Invalid host header')
- self.logger.set_statsd_prefix('proxy-server.' +
- controller.server_type.lower()) #在日志的开头加上‘proxy-server.controller类中的请求类型(eg:HEAD/GET/PUT)’,方便跟踪分析
- controller = controller(self, **path_parts) #初始化实际的controller对象(AccountController、ContainerController、ObjectController、InfoController其中之一)
- if 'swift.trans_id' not in req.environ: #如果没有trans_id在env中,则重新生成一个,有些类似于http请求中的seesionID的感觉,是一种UUID
- # if this wasn't set by an earlier middleware, set it now
- trans_id = generate_trans_id(self.trans_id_suffix)
- req.environ['swift.trans_id'] = trans_id
- self.logger.txn_id = trans_id
- req.headers['x-trans-id'] = req.environ['swift.trans_id']
- controller.trans_id = req.environ['swift.trans_id']
- self.logger.client_ip = get_remote_client(req) #把请求中获取出请求端的IP信息,加入logger对象,方便后续日志查看分析
- try:
- handler = getattr(controller, req.method) #根据req.method方法获取对应controller对象中的方法(可能是多个,有的有public标签,有的没有)
- getattr(handler, 'publicly_accessible') #再根据public标签获取最终的处理方法。(在方法前面可以加 @public 和@delay_denial)
- except AttributeError:
- allowed_methods = getattr(controller, 'allowed_methods', set())
- return HTTPMethodNotAllowed(
- request=req, headers={'Allow': ', '.join(allowed_methods)})
- if 'swift.authorize' in req.environ: #做鉴权操作
- # We call authorize before the handler, always. If authorized,
- # we remove the swift.authorize hook so isn't ever called
- # again. If not authorized, we return the denial unless the
- # controller's method indicates it'd like to gather more
- # information and try again later.
- resp = req.environ['swift.authorize'](req)
- if not resp:
- # No resp means authorized, no delayed recheck required.
- del req.environ['swift.authorize']
- else:
- # Response indicates denial, but we might delay the denial
- # and recheck later. If not delayed, return the error now.
- if not getattr(handler, 'delay_denial', None):
- return resp
- # Save off original request method (GET, POST, etc.) in case it
- # gets mutated during handling. This way logging can display the
- # method the client actually sent.
- req.environ['swift.orig_req_method'] = req.method
- return handler(req) #调用最终的method方法,并返回resp结果
- except HTTPException as error_response:
- return error_response
- except (Exception, Timeout):
- self.logger.exception(_('ERROR Unhandled exception in request'))
- return HTTPServerError(request=req)
- def sort_nodes(self, nodes):
- '''''
- Sorts nodes in-place (and returns the sorted list) according to
- the configured strategy. The default "sorting" is to randomly
- shuffle the nodes. If the "timing" strategy is chosen, the nodes
- are sorted according to the stored timing data.
- '''
- # In the case of timing sorting, shuffling ensures that close timings
- # (ie within the rounding resolution) won't prefer one over another.
- # Python's sort is stable (http://wiki.python.org/moin/HowTo/Sorting/)
- shuffle(nodes)
- if self.sorting_method == 'timing': #配置文件中排序方法为timing时,以时间排序
- now = time()
- def key_func(node):
- timing, expires = self.node_timings.get(node['ip'], (-1.0, 0))
- return timing if expires > now else -1.0
- nodes.sort(key=key_func)
- elif self.sorting_method == 'affinity': #配置文件中排序方法为affinity时,以自定义的亲和力规则排序
- nodes.sort(key=self.read_affinity_sort_key)
- return nodes
- def set_node_timing(self, node, timing):
- if self.sorting_method != 'timing':
- return
- now = time()
- timing = round(timing, 3) # sort timings to the millisecond
- self.node_timings[node['ip']] = (timing, now + self.timing_expiry)
- def error_limited(self, node):
- """
- Check if the node is currently error limited.
- :param node: dictionary of node to check
- :returns: True if error limited, False otherwise
- """
- now = time()
- if 'errors' not in node: #errors没在node中时返回false
- return False
- if 'last_error' in node and node['last_error'] < \ #last_error在node中有并且 last_error小于现在时间减去系统允许的时间间隔
- now - self.error_suppression_interval:
- del node['last_error'] #node去掉last_error
- if 'errors' in node: #errors在node中时返回 去掉errors,且返回false
- del node['errors']
- return False
- limited = node['errors'] > self.error_suppression_limit #errors在node中中的个数多与系统允许的个数,是返回true,且做日志记录
- if limited:
- self.logger.debug(
- _('Node error limited %(ip)s:%(port)s (%(device)s)'), node)
- return limited
- def error_limit(self, node, msg):
- """
- Mark a node as error limited. This immediately pretends the
- node received enough errors to trigger error suppression. Use
- this for errors like Insufficient Storage. For other errors
- use :func:`error_occurred`.
- :param node: dictionary of node to error limit
- :param msg: error message
- """
- node['errors'] = self.error_suppression_limit + 1
- node['last_error'] = time()
- self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),
- {'msg': msg, 'ip': node['ip'],
- 'port': node['port'], 'device': node['device']})
- def error_occurred(self, node, msg):
- """
- Handle logging, and handling of errors.
- :param node: dictionary of node to handle errors for
- :param msg: error message
- """
- node['errors'] = node.get('errors', 0) + 1
- node['last_error'] = time()
- self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),
- {'msg': msg, 'ip': node['ip'],
- 'port': node['port'], 'device': node['device']})
- def iter_nodes(self, ring, partition, node_iter=None):
- """
- Yields nodes for a ring partition, skipping over error
- limited nodes and stopping at the configurable number of
- nodes. If a node yielded subsequently gets error limited, an
- extra node will be yielded to take its place.
- Note that if you're going to iterate over this concurrently from
- multiple greenthreads, you'll want to use a
- swift.common.utils.GreenthreadSafeIterator to serialize access.
- Otherwise, you may get ValueErrors from concurrent access. (You also
- may not, depending on how logging is configured, the vagaries of
- socket IO and eventlet, and the phase of the moon.)
- :param ring: ring to get yield nodes from
- :param partition: ring partition to yield nodes for
- :param node_iter: optional iterable of nodes to try. Useful if you
- want to filter or reorder the nodes.
- """
- part_nodes = ring.get_part_nodes(partition)
- if node_iter is None:
- node_iter = itertools.chain(part_nodes,
- ring.get_more_nodes(partition))
- num_primary_nodes = len(part_nodes)
- # Use of list() here forcibly yanks the first N nodes (the primary
- # nodes) from node_iter, so the rest of its values are handoffs.
- primary_nodes = self.sort_nodes(
- list(itertools.islice(node_iter, num_primary_nodes)))
- handoff_nodes = node_iter
- nodes_left = self.request_node_count(len(primary_nodes))
- for node in primary_nodes:
- if not self.error_limited(node):
- yield node
- if not self.error_limited(node):
- nodes_left -= 1
- if nodes_left <= 0:
- return
- handoffs = 0
- for node in handoff_nodes:
- if not self.error_limited(node):
- handoffs += 1
- if self.log_handoffs:
- self.logger.increment('handoff_count')
- self.logger.warning(
- 'Handoff requested (%d)' % handoffs)
- if handoffs == len(primary_nodes):
- self.logger.increment('handoff_all_count')
- yield node
- if not self.error_limited(node):
- nodes_left -= 1
- if nodes_left <= 0:
- return
- def exception_occurred(self, node, typ, additional_info):
- """
- Handle logging of generic exceptions.
- :param node: dictionary of node to log the error for
- :param typ: server type
- :param additional_info: additional information to log
- """
- self.logger.exception(
- _('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: '
- '%(info)s'),
- {'type': typ, 'ip': node['ip'], 'port': node['port'],
- 'device': node['device'], 'info': additional_info})
- def modify_wsgi_pipeline(self, pipe):
- """
- Called during WSGI pipeline creation. Modifies the WSGI pipeline
- context to ensure that mandatory middleware is present in the pipeline.
- :param pipe: A PipelineWrapper object
- """
- pipeline_was_modified = False
- for filter_spec in reversed(required_filters): #当required_filters字典中定义了需要重新排序的app时,进行pipeline的重新排序处理
- filter_name = filter_spec['name']
- if filter_name not in pipe:
- afters = filter_spec.get('after_fn', lambda _junk: [])(pipe)
- insert_at = 0
- for after in afters:
- try:
- insert_at = max(insert_at, pipe.index(after) + 1)
- except ValueError: # not in pipeline; ignore it
- pass
- self.logger.info(
- 'Adding required filter %s to pipeline at position %d' %
- (filter_name, insert_at))
- ctx = pipe.create_filter(filter_name)
- pipe.insert_filter(ctx, index=insert_at)
- pipeline_was_modified = True
- if pipeline_was_modified:
- self.logger.info("Pipeline was modified. New pipeline is \"%s\".",
- pipe)
- else:
- self.logger.debug("Pipeline is \"%s\"", pipe)
- def app_factory(global_conf, **local_conf):
- """paste.deploy app factory for creating WSGI proxy apps."""
- conf = global_conf.copy()
- conf.update(local_conf)
- app = Application(conf)
- app.check_config()
- return app