3

OpenStack源码学习笔记2

 2 years ago
source link: https://www.hi-roy.com/posts/openstack%E6%BA%90%E7%A0%81%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B02/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

OpenStack源码学习笔记2

2019-09-18

上次学习了Nova创建虚拟机的过程,这次来看一下Glance是如何上传镜像的。相比于Nova,Glance源码使用了大量的代理模式和装饰器模式,阅读代码时候一个不仔细就会一脸懵X。根据上次说的Openstack套路,我们通过setup.cfg直奔主题——glance/cmd/api.py:

def main():
    try:
        config.parse_args()
        config.set_config_defaults()
        wsgi.set_eventlet_hub()
        logging.setup(CONF, 'glance')
        notifier.set_defaults()
        if cfg.CONF.profiler.enabled:
            _notifier = osprofiler.notifier.create("Messaging",
                                                   oslo_messaging, {},
                                                   notifier.get_transport(),
                                                   "glance", "api",
                                                   cfg.CONF.bind_host)
            osprofiler.notifier.set(_notifier)
            osprofiler.web.enable(cfg.CONF.profiler.hmac_keys)
        else:
            osprofiler.web.disable()
        server = wsgi.Server(initialize_glance_store=True)
        server.start(config.load_paste_app('glance-api'), default_port=9292)
        server.wait()
    except KNOWN_EXCEPTIONS as e:
        print(e)
        fail(e)

配置加载与路由绑定

和Nova一样,这个文件主要作用就是加载配置、创建WSGI Server并运行,这里我们注意一下initialize_glance_store=True这里,新版中关于存储的部分已经独立出项目叫做glance_store,这里还对这部分进行了初始化,我们跟进glance/common/wsgi.py中:


def initialize_glance_store():
    """Initialize glance store."""
    glance_store.register_opts(CONF)
    glance_store.create_stores(CONF)
    glance_store.verify_default_store()

class Server(object):
    ......

    def __init__(self, threads=1000, initialize_glance_store=False):
        ......    
        self.initialize_glance_store = initialize_glance_store
        ......

    def start(self, application, default_port):
        self.application = application
        self.default_port = default_port
        self.configure()
        self.start_wsgi()

    def configure(self, old_conf=None, has_changed=None):
        eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line
        self.client_socket_timeout = CONF.client_socket_timeout or None
        self.configure_socket(old_conf, has_changed)
        if self.initialize_glance_store:
            initialize_glance_store()

这里我们跟进glance_store/backend.py中:

def create_stores(conf=CONF):
    store_count = 0

    for (store_entry, store_instance) in _load_stores(conf):
        try:
            schemes = store_instance.get_schemes()
            store_instance.configure(re_raise_bsc=False)
        except NotImplementedError:
            continue
        if not schemes:
            raise exceptions.BackendException('Unable to register store %s. '
                                              'No schemes associated with it.'
                                              % store_entry)
        else:
            LOG.debug("Registering store %s with schemes %s",
                      store_entry, schemes)
            scheme_map = {}
            loc_cls = store_instance.get_store_location_class()
            for scheme in schemes:
                scheme_map[scheme] = {
                    'store': store_instance,
                    'location_class': loc_cls,
                    'store_entry': store_entry
                }
            location.register_scheme_map(scheme_map)
            store_count += 1
    return store_count

在这里会根据/etc/glance/glance_api.conf中的配置信息找到对应的driver(位于glance_store/_drivers目录)并配置,然后调用register_scheme_map()进行绑定:

SCHEME_TO_CLS_MAP = {}
def register_scheme_map(scheme_map):
    for (k, v) in scheme_map.items():
        LOG.debug("Registering scheme %s with %s", k, v)
        SCHEME_TO_CLS_MAP[k] = v

这样就完成了所需的准备工作。

glance镜像上传分为2个步骤,首先在数据库中创建一条记录,并返回相关信息,此时使用glance image-list命令可以查看到一个空镜像,状态为queued。然后再上传镜像数据,上传完成后进入active状态,代码均来源于rocky版。

create

根据文档创建镜像是向/v2/images发送POST请求,然后再结合glance-api-paste.ini中的定义:

[pipeline:glance-api]
pipeline = cors healthcheck http_proxy_to_wsgi versionnegotiation osprofiler unauthenticated-context rootapp

[composite:rootapp]
paste.composite_factory = glance.api:root_app_factory
/: apiversions
/v1: apiv1app
/v2: apiv2app

[app:apiversions]
paste.app_factory = glance.api.versions:create_resource

[app:apiv1app]
paste.app_factory = glance.api.v1.router:API.factory

[app:apiv2app]
paste.app_factory = glance.api.v2.router:API.factory

根据glance/api/v2/router.py中的API()定义,找到实际处理post请求的函数为glance/api/v2/images.py中的ImagesController类的create方法:

def create(self, req, image, extra_properties, tags):
        image_factory = self.gateway.get_image_factory(req.context)
        image_repo = self.gateway.get_repo(req.context)
        try:
            image = image_factory.new_image(extra_properties=extra_properties,
                                            tags=tags, **image)
            image_repo.add(image)
        except (exception.DuplicateLocation,
                exception.Invalid) as e:
            raise webob.exc.HTTPBadRequest(explanation=e.msg)
        except (exception.ReservedProperty,
                exception.ReadonlyProperty) as e:
            raise webob.exc.HTTPForbidden(explanation=e.msg)
        except exception.Forbidden as e:
            LOG.debug("User not permitted to create image")
            raise webob.exc.HTTPForbidden(explanation=e.msg)
        except exception.LimitExceeded as e:
            LOG.warn(encodeutils.exception_to_unicode(e))
            raise webob.exc.HTTPRequestEntityTooLarge(
                explanation=e.msg, request=req, content_type='text/plain')
        except exception.Duplicate as e:
            raise webob.exc.HTTPConflict(explanation=e.msg)
        except exception.NotAuthenticated as e:
            raise webob.exc.HTTPUnauthorized(explanation=e.msg)
        except TypeError as e:
            LOG.debug(encodeutils.exception_to_unicode(e))
            raise webob.exc.HTTPBadRequest(explanation=e)
        return image

这段代码看上简单,实际内含玄机,如果跟进gateway.get_image_factorygateway.get_repo函数,会发现作者用了大量的装饰器模式和代理模式:

# glance/gateway.py

def get_image_factory(self, context):
    image_factory = glance.domain.ImageFactory()
    store_image_factory = glance.location.ImageFactoryProxy(
        image_factory, context, self.store_api, self.store_utils)
    quota_image_factory = glance.quota.ImageFactoryProxy(
        store_image_factory, context, self.db_api, self.store_utils)
    policy_image_factory = policy.ImageFactoryProxy(
        quota_image_factory, context, self.policy)
    notifier_image_factory = glance.notifier.ImageFactoryProxy(
        policy_image_factory, context, self.notifier)
    if property_utils.is_property_protection_enabled():
        property_rules = property_utils.PropertyRules(self.policy)
        pif = property_protections.ProtectedImageFactoryProxy(
            notifier_image_factory, context, property_rules)
        authorized_image_factory = authorization.ImageFactoryProxy(
            pif, context)
    else:
        authorized_image_factory = authorization.ImageFactoryProxy(
            notifier_image_factory, context)
    return authorized_image_factory

def get_repo(self, context):
    image_repo = glance.db.ImageRepo(context, self.db_api)
    store_image_repo = glance.location.ImageRepoProxy(
        image_repo, context, self.store_api, self.store_utils)
    quota_image_repo = glance.quota.ImageRepoProxy(
        store_image_repo, context, self.db_api, self.store_utils)
    policy_image_repo = policy.ImageRepoProxy(
        quota_image_repo, context, self.policy)
    notifier_image_repo = glance.notifier.ImageRepoProxy(
        policy_image_repo, context, self.notifier)
    if property_utils.is_property_protection_enabled():
        property_rules = property_utils.PropertyRules(self.policy)
        pir = property_protections.ProtectedImageRepoProxy(
            notifier_image_repo, context, property_rules)
        authorized_image_repo = authorization.ImageRepoProxy(
            pir, context)
    else:
        authorized_image_repo = authorization.ImageRepoProxy(
            notifier_image_repo, context)

    return authorized_image_repo

阅读这里的时候一个不仔细逻辑就断了,要像剥洋葱一样,一层一层剥开它的心。这里我就不记录追踪细节了,经过一层层判断后,image_factory.new_image函数最终进入glance/domain/__init__.py中并返回了一个Image类型实例。

然后将这个实例传递给image_repo.add函数,这个函数再经过一层层判断,进入glance/db/__init__.py中调用ImageRepoadd()方法,在这个方法中最终调用了glance/db/sqlalchemy/api.py中的image_create()函数来在数据库中创建新记录。

如果没有发生错误,则返回创建的空镜像信息给客户端。

upload

文档中定义,上传数据行为是向/v2/images/{image_id}/file发送PUT请求,实际处理函数为glance/api/v2/image_data.py中的ImageDataController类的upload方法:

    ......
    @utils.mutating
    def upload(self, req, image_id, data, size):
        backend = None
        image_repo = self.gateway.get_repo(req.context)
        image = None
        refresher = None
        cxt = req.context
        try:
            self.policy.enforce(cxt, 'upload_image', {})
            image = image_repo.get(image_id)
            image.status = 'saving'
            try:
                image_repo.save(image, from_state='queued')
                image.set_data(data, size, backend=backend)
                try:
                    image_repo.save(image, from_state='saving')
                except exception.NotAuthenticated:
                    if refresher is not None:
                        # request a new token to update an image in database
                        cxt.auth_token = refresher.refresh_token()
                        image_repo = self.gateway.get_repo(req.context)
                        image_repo.save(image, from_state='saving')
                    else:
                        raise
        ......

新版本中glance已经可以支持多后端的配置,但还不是稳定版。这里以单后端为例,首先将状态改为saving,然后调用set_data函数。由于某些不可描述的原因,我没法启动程序进行单步调试,只能使用IDE的跳转功能,结果这里兜兜转转饶了很久。这里我就直接给出答案吧,最终调用的是glance/location.pyImageProxy类中的方法。这里面的关键点在于image其实是由ImageProxy实例代理的,转换发生在初始化对象时候创建的Helper类,这个类有一个proxy方法用来将原始Image类型转换成ImageProxy类型。这里具体就不再详细说明了,回到set_data函数定义:

def set_data(self, data, size=None, backend=None):
    ......
    hashing_algo = CONF['hashing_algorithm']
    if CONF.enabled_backends:
        (location, size, checksum,
            multihash, loc_meta) = self.store_api.add_with_multihash(
            CONF,
            self.image.image_id,
            utils.LimitingReader(utils.CooperativeReader(data),
                                    CONF.image_size_cap),
            size,
            backend,
            hashing_algo,
            context=self.context,
            verifier=verifier)
    else:
        (location, size, checksum,
            multihash, loc_meta) = self.store_api.add_to_backend_with_multihash(
            CONF,
            self.image.image_id,
            utils.LimitingReader(utils.CooperativeReader(data),
                                    CONF.image_size_cap),
            size,
            hashing_algo,
            context=self.context,
            verifier=verifier)

    self.image.locations = [{'url': location, 'metadata': loc_meta, 'status': 'active'}]
    self.image.size = size
    self.image.checksum = checksum
    self.image.os_hash_value = multihash
    self.image.os_hash_algo = hashing_algo
    self.image.status = 'active'

这里的store_api默认就是glance_store了,其中add_with_multihash是启用多后端时候调用,add_to_backend_with_multihash启用单后端时候调用。这里以单后端为例:

def add_to_backend_with_multihash(conf, image_id, data, size, hashing_algo,
                                  scheme=None, context=None, verifier=None):
    if scheme is None:
        scheme = conf['glance_store']['default_store']
    store = get_store_from_scheme(scheme)
    return store_add_to_backend_with_multihash(
        image_id, data, size, hashing_algo, store, context, verifier)

其中get_store_from_scheme函数作用是获取到文章开头所说的绑定到SCHEME_TO_CLS_MAP中的对应的driver,然后经过store_add_to_backend_with_multihash函数进入相应的driveradd方法,这里以Ceph的块存储RBD(RADOS Block Device)为例,函数位于glance_store/_drivers/rbd.py:

    @driver.back_compat_add
    @capabilities.check
    def add(self, image_id, image_file, image_size, hashing_algo, context=None,
            verifier=None):
        checksum = hashlib.md5()
        os_hash_value = hashlib.new(str(hashing_algo))
        image_name = str(image_id)
        with self.get_connection(conffile=self.conf_file,
                                 rados_id=self.user) as conn:
            fsid = None
            if hasattr(conn, 'get_fsid'):
                fsid = encodeutils.safe_decode(conn.get_fsid())
            with conn.open_ioctx(self.pool) as ioctx:
                order = int(math.log(self.WRITE_CHUNKSIZE, 2))
                LOG.debug('creating image %s with order %d and size %d',
                          image_name, order, image_size)
                if image_size == 0:
                    LOG.warning(_("since image size is zero we will be doing "
                                  "resize-before-write for each chunk which "
                                  "will be considerably slower than normal"))

                try:
                    loc = self._create_image(fsid, conn, ioctx, image_name,
                                             image_size, order)
                except rbd.ImageExists:
                    msg = _('RBD image %s already exists') % image_id
                    raise exceptions.Duplicate(message=msg)

                try:
                    with rbd.Image(ioctx, image_name) as image:
                        bytes_written = 0
                        offset = 0
                        chunks = utils.chunkreadable(image_file,
                                                     self.WRITE_CHUNKSIZE)
                        for chunk in chunks:
                            # If the image size provided is zero we need to do
                            # a resize for the amount we are writing. This will
                            # be slower so setting a higher chunk size may
                            # speed things up a bit.
                            if image_size == 0:
                                chunk_length = len(chunk)
                                length = offset + chunk_length
                                bytes_written += chunk_length
                                LOG.debug(_("resizing image to %s KiB") %
                                          (length / units.Ki))
                                image.resize(length)
                            LOG.debug(_("writing chunk at offset %s") %
                                      (offset))
                            offset += image.write(chunk, offset)
                            os_hash_value.update(chunk)
                            checksum.update(chunk)
                            if verifier:
                                verifier.update(chunk)
                        if loc.snapshot:
                            image.create_snap(loc.snapshot)
                            image.protect_snap(loc.snapshot)
        ......
        if image_size == 0:
            image_size = bytes_written
        metadata = {}
        if self.backend_group:
            metadata['store'] = u"%s" % self.backend_group
        return (loc.get_uri(),
                image_size,
                checksum.hexdigest(),
                os_hash_value.hexdigest(),
                metadata)

这个函数首先计算hash值,然后创建连接,再判断镜像是否存在,不存在则上传,然后将数据返回给调用者。最后glance中修改状态为active,整个镜像上传过程就结束了。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK