OpenStack源码学习笔记2
source link: https://www.hi-roy.com/posts/openstack%E6%BA%90%E7%A0%81%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B02/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
OpenStack源码学习笔记2
上次学习了Nova创建虚拟机的过程,这次来看一下Glance是如何上传镜像的。相比于Nova,Glance源码使用了大量的代理模式和装饰器模式,阅读代码时候一个不仔细就会一脸懵X。根据上次说的Openstack套路,我们通过setup.cfg
直奔主题——glance/cmd/api.py
:
def main():
try:
config.parse_args()
config.set_config_defaults()
wsgi.set_eventlet_hub()
logging.setup(CONF, 'glance')
notifier.set_defaults()
if cfg.CONF.profiler.enabled:
_notifier = osprofiler.notifier.create("Messaging",
oslo_messaging, {},
notifier.get_transport(),
"glance", "api",
cfg.CONF.bind_host)
osprofiler.notifier.set(_notifier)
osprofiler.web.enable(cfg.CONF.profiler.hmac_keys)
else:
osprofiler.web.disable()
server = wsgi.Server(initialize_glance_store=True)
server.start(config.load_paste_app('glance-api'), default_port=9292)
server.wait()
except KNOWN_EXCEPTIONS as e:
print(e)
fail(e)
配置加载与路由绑定
和Nova一样,这个文件主要作用就是加载配置、创建WSGI Server并运行,这里我们注意一下initialize_glance_store=True
这里,新版中关于存储的部分已经独立出项目叫做glance_store,这里还对这部分进行了初始化,我们跟进glance/common/wsgi.py
中:
def initialize_glance_store():
"""Initialize glance store."""
glance_store.register_opts(CONF)
glance_store.create_stores(CONF)
glance_store.verify_default_store()
class Server(object):
......
def __init__(self, threads=1000, initialize_glance_store=False):
......
self.initialize_glance_store = initialize_glance_store
......
def start(self, application, default_port):
self.application = application
self.default_port = default_port
self.configure()
self.start_wsgi()
def configure(self, old_conf=None, has_changed=None):
eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line
self.client_socket_timeout = CONF.client_socket_timeout or None
self.configure_socket(old_conf, has_changed)
if self.initialize_glance_store:
initialize_glance_store()
这里我们跟进glance_store/backend.py
中:
def create_stores(conf=CONF):
store_count = 0
for (store_entry, store_instance) in _load_stores(conf):
try:
schemes = store_instance.get_schemes()
store_instance.configure(re_raise_bsc=False)
except NotImplementedError:
continue
if not schemes:
raise exceptions.BackendException('Unable to register store %s. '
'No schemes associated with it.'
% store_entry)
else:
LOG.debug("Registering store %s with schemes %s",
store_entry, schemes)
scheme_map = {}
loc_cls = store_instance.get_store_location_class()
for scheme in schemes:
scheme_map[scheme] = {
'store': store_instance,
'location_class': loc_cls,
'store_entry': store_entry
}
location.register_scheme_map(scheme_map)
store_count += 1
return store_count
在这里会根据/etc/glance/glance_api.conf
中的配置信息找到对应的driver(位于glance_store/_drivers
目录)并配置,然后调用register_scheme_map()
进行绑定:
SCHEME_TO_CLS_MAP = {}
def register_scheme_map(scheme_map):
for (k, v) in scheme_map.items():
LOG.debug("Registering scheme %s with %s", k, v)
SCHEME_TO_CLS_MAP[k] = v
这样就完成了所需的准备工作。
glance镜像上传分为2个步骤,首先在数据库中创建一条记录,并返回相关信息,此时使用glance image-list
命令可以查看到一个空镜像,状态为queued
。然后再上传镜像数据,上传完成后进入active
状态,代码均来源于rocky版。
create
根据文档创建镜像是向/v2/images
发送POST请求,然后再结合glance-api-paste.ini
中的定义:
[pipeline:glance-api]
pipeline = cors healthcheck http_proxy_to_wsgi versionnegotiation osprofiler unauthenticated-context rootapp
[composite:rootapp]
paste.composite_factory = glance.api:root_app_factory
/: apiversions
/v1: apiv1app
/v2: apiv2app
[app:apiversions]
paste.app_factory = glance.api.versions:create_resource
[app:apiv1app]
paste.app_factory = glance.api.v1.router:API.factory
[app:apiv2app]
paste.app_factory = glance.api.v2.router:API.factory
根据glance/api/v2/router.py
中的API()
定义,找到实际处理post请求的函数为glance/api/v2/images.py
中的ImagesController
类的create
方法:
def create(self, req, image, extra_properties, tags):
image_factory = self.gateway.get_image_factory(req.context)
image_repo = self.gateway.get_repo(req.context)
try:
image = image_factory.new_image(extra_properties=extra_properties,
tags=tags, **image)
image_repo.add(image)
except (exception.DuplicateLocation,
exception.Invalid) as e:
raise webob.exc.HTTPBadRequest(explanation=e.msg)
except (exception.ReservedProperty,
exception.ReadonlyProperty) as e:
raise webob.exc.HTTPForbidden(explanation=e.msg)
except exception.Forbidden as e:
LOG.debug("User not permitted to create image")
raise webob.exc.HTTPForbidden(explanation=e.msg)
except exception.LimitExceeded as e:
LOG.warn(encodeutils.exception_to_unicode(e))
raise webob.exc.HTTPRequestEntityTooLarge(
explanation=e.msg, request=req, content_type='text/plain')
except exception.Duplicate as e:
raise webob.exc.HTTPConflict(explanation=e.msg)
except exception.NotAuthenticated as e:
raise webob.exc.HTTPUnauthorized(explanation=e.msg)
except TypeError as e:
LOG.debug(encodeutils.exception_to_unicode(e))
raise webob.exc.HTTPBadRequest(explanation=e)
return image
这段代码看上简单,实际内含玄机,如果跟进gateway.get_image_factory
和gateway.get_repo
函数,会发现作者用了大量的装饰器模式和代理模式:
# glance/gateway.py
def get_image_factory(self, context):
image_factory = glance.domain.ImageFactory()
store_image_factory = glance.location.ImageFactoryProxy(
image_factory, context, self.store_api, self.store_utils)
quota_image_factory = glance.quota.ImageFactoryProxy(
store_image_factory, context, self.db_api, self.store_utils)
policy_image_factory = policy.ImageFactoryProxy(
quota_image_factory, context, self.policy)
notifier_image_factory = glance.notifier.ImageFactoryProxy(
policy_image_factory, context, self.notifier)
if property_utils.is_property_protection_enabled():
property_rules = property_utils.PropertyRules(self.policy)
pif = property_protections.ProtectedImageFactoryProxy(
notifier_image_factory, context, property_rules)
authorized_image_factory = authorization.ImageFactoryProxy(
pif, context)
else:
authorized_image_factory = authorization.ImageFactoryProxy(
notifier_image_factory, context)
return authorized_image_factory
def get_repo(self, context):
image_repo = glance.db.ImageRepo(context, self.db_api)
store_image_repo = glance.location.ImageRepoProxy(
image_repo, context, self.store_api, self.store_utils)
quota_image_repo = glance.quota.ImageRepoProxy(
store_image_repo, context, self.db_api, self.store_utils)
policy_image_repo = policy.ImageRepoProxy(
quota_image_repo, context, self.policy)
notifier_image_repo = glance.notifier.ImageRepoProxy(
policy_image_repo, context, self.notifier)
if property_utils.is_property_protection_enabled():
property_rules = property_utils.PropertyRules(self.policy)
pir = property_protections.ProtectedImageRepoProxy(
notifier_image_repo, context, property_rules)
authorized_image_repo = authorization.ImageRepoProxy(
pir, context)
else:
authorized_image_repo = authorization.ImageRepoProxy(
notifier_image_repo, context)
return authorized_image_repo
阅读这里的时候一个不仔细逻辑就断了,要像剥洋葱一样,一层一层剥开它的心。这里我就不记录追踪细节了,经过一层层判断后,image_factory.new_image
函数最终进入glance/domain/__init__.py
中并返回了一个Image
类型实例。
然后将这个实例传递给image_repo.add
函数,这个函数再经过一层层判断,进入glance/db/__init__.py
中调用ImageRepo
的add()
方法,在这个方法中最终调用了glance/db/sqlalchemy/api.py
中的image_create()
函数来在数据库中创建新记录。
如果没有发生错误,则返回创建的空镜像信息给客户端。
upload
文档中定义,上传数据行为是向/v2/images/{image_id}/file
发送PUT请求,实际处理函数为glance/api/v2/image_data.py
中的ImageDataController
类的upload
方法:
......
@utils.mutating
def upload(self, req, image_id, data, size):
backend = None
image_repo = self.gateway.get_repo(req.context)
image = None
refresher = None
cxt = req.context
try:
self.policy.enforce(cxt, 'upload_image', {})
image = image_repo.get(image_id)
image.status = 'saving'
try:
image_repo.save(image, from_state='queued')
image.set_data(data, size, backend=backend)
try:
image_repo.save(image, from_state='saving')
except exception.NotAuthenticated:
if refresher is not None:
# request a new token to update an image in database
cxt.auth_token = refresher.refresh_token()
image_repo = self.gateway.get_repo(req.context)
image_repo.save(image, from_state='saving')
else:
raise
......
新版本中glance已经可以支持多后端的配置,但还不是稳定版。这里以单后端为例,首先将状态改为saving,然后调用set_data
函数。由于某些不可描述的原因,我没法启动程序进行单步调试,只能使用IDE的跳转功能,结果这里兜兜转转饶了很久。这里我就直接给出答案吧,最终调用的是glance/location.py
的ImageProxy
类中的方法。这里面的关键点在于image其实是由ImageProxy
实例代理的,转换发生在初始化对象时候创建的Helper
类,这个类有一个proxy
方法用来将原始Image
类型转换成ImageProxy
类型。这里具体就不再详细说明了,回到set_data
函数定义:
def set_data(self, data, size=None, backend=None):
......
hashing_algo = CONF['hashing_algorithm']
if CONF.enabled_backends:
(location, size, checksum,
multihash, loc_meta) = self.store_api.add_with_multihash(
CONF,
self.image.image_id,
utils.LimitingReader(utils.CooperativeReader(data),
CONF.image_size_cap),
size,
backend,
hashing_algo,
context=self.context,
verifier=verifier)
else:
(location, size, checksum,
multihash, loc_meta) = self.store_api.add_to_backend_with_multihash(
CONF,
self.image.image_id,
utils.LimitingReader(utils.CooperativeReader(data),
CONF.image_size_cap),
size,
hashing_algo,
context=self.context,
verifier=verifier)
self.image.locations = [{'url': location, 'metadata': loc_meta, 'status': 'active'}]
self.image.size = size
self.image.checksum = checksum
self.image.os_hash_value = multihash
self.image.os_hash_algo = hashing_algo
self.image.status = 'active'
这里的store_api
默认就是glance_store
了,其中add_with_multihash
是启用多后端时候调用,add_to_backend_with_multihash
启用单后端时候调用。这里以单后端为例:
def add_to_backend_with_multihash(conf, image_id, data, size, hashing_algo,
scheme=None, context=None, verifier=None):
if scheme is None:
scheme = conf['glance_store']['default_store']
store = get_store_from_scheme(scheme)
return store_add_to_backend_with_multihash(
image_id, data, size, hashing_algo, store, context, verifier)
其中get_store_from_scheme
函数作用是获取到文章开头所说的绑定到SCHEME_TO_CLS_MAP
中的对应的driver
,然后经过store_add_to_backend_with_multihash
函数进入相应的driver
的add
方法,这里以Ceph的块存储RBD(RADOS Block Device)为例,函数位于glance_store/_drivers/rbd.py
:
@driver.back_compat_add
@capabilities.check
def add(self, image_id, image_file, image_size, hashing_algo, context=None,
verifier=None):
checksum = hashlib.md5()
os_hash_value = hashlib.new(str(hashing_algo))
image_name = str(image_id)
with self.get_connection(conffile=self.conf_file,
rados_id=self.user) as conn:
fsid = None
if hasattr(conn, 'get_fsid'):
fsid = encodeutils.safe_decode(conn.get_fsid())
with conn.open_ioctx(self.pool) as ioctx:
order = int(math.log(self.WRITE_CHUNKSIZE, 2))
LOG.debug('creating image %s with order %d and size %d',
image_name, order, image_size)
if image_size == 0:
LOG.warning(_("since image size is zero we will be doing "
"resize-before-write for each chunk which "
"will be considerably slower than normal"))
try:
loc = self._create_image(fsid, conn, ioctx, image_name,
image_size, order)
except rbd.ImageExists:
msg = _('RBD image %s already exists') % image_id
raise exceptions.Duplicate(message=msg)
try:
with rbd.Image(ioctx, image_name) as image:
bytes_written = 0
offset = 0
chunks = utils.chunkreadable(image_file,
self.WRITE_CHUNKSIZE)
for chunk in chunks:
# If the image size provided is zero we need to do
# a resize for the amount we are writing. This will
# be slower so setting a higher chunk size may
# speed things up a bit.
if image_size == 0:
chunk_length = len(chunk)
length = offset + chunk_length
bytes_written += chunk_length
LOG.debug(_("resizing image to %s KiB") %
(length / units.Ki))
image.resize(length)
LOG.debug(_("writing chunk at offset %s") %
(offset))
offset += image.write(chunk, offset)
os_hash_value.update(chunk)
checksum.update(chunk)
if verifier:
verifier.update(chunk)
if loc.snapshot:
image.create_snap(loc.snapshot)
image.protect_snap(loc.snapshot)
......
if image_size == 0:
image_size = bytes_written
metadata = {}
if self.backend_group:
metadata['store'] = u"%s" % self.backend_group
return (loc.get_uri(),
image_size,
checksum.hexdigest(),
os_hash_value.hexdigest(),
metadata)
这个函数首先计算hash值,然后创建连接,再判断镜像是否存在,不存在则上传,然后将数据返回给调用者。最后glance中修改状态为active,整个镜像上传过程就结束了。
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK