Commit eb76a235 authored by Morten Knutsen's avatar Morten Knutsen

Move old files as is from nova_portal@bd3c11515a25f6bd4dd5071b1a660a72a7c4d5db.

from __future__ import absolute_import, division, print_function, unicode_literals
from urllib.parse import urlencode # pylint: disable=F0401,E0611
except ImportError:
from urllib import urlencode # pylint: disable=F0401,E0611
import requests
from requests.auth import HTTPBasicAuth
from lxml import etree
from configparser import SafeConfigParser # pylint: disable=F0401
except ImportError:
from ConfigParser import SafeConfigParser # pylint: disable=F0401
import uuid
from dateutil.parser import parse as tsparse
namespaces = {
'b': "",
'atom': "",
'ds': "",
'm': ""
def namespace_split(tagname):
if not '}' in tagname:
return ('', tagname)
namespace, tag = tagname.split('}')
namespace = namespace[1:]
return (namespace, tag)
def ns(namespace, name):
return "{" + namespaces[namespace] + "}" + name
def is_string(value):
return isinstance(value, basestring) # pylint: disable=E0602
except NameError:
return isinstance(value, str) # pylint: disable=E0602
def type_encode_argvalue(value):
if type(value) == uuid.UUID:
return "guid'{}'".format(value)
if is_string(value):
return "'{}'".format(value)
return value
class ODataException(Exception):
def __init__(self, error_type, error_message, full_error):
self.error_type = error_type
self.error_message = error_message
self.full_error = full_error
class Client(object):
passman = None
def __init__(self, baseurl, username, password):
self.baseurl = baseurl
self.auth = HTTPBasicAuth(username, password)
def _format_url(self, resource, query=None, args=None, subresource=None):
url = self.baseurl
fragments = {}
if resource:
url += "/" + resource
if args:
argstr = ",".join(("{key}={value}".format(key=key, value=type_encode_argvalue(value)) for key, value in args.items()))
url = "{url}({argstr})".format(url=url, argstr=argstr)
if subresource:
url += "/" + subresource
if query:
argstr = " and ".join(("{key} eq {value}".format(key=key, value=type_encode_argvalue(query[key])) for key in query))
fragments["$filter"] = argstr
if fragments:
url += "?" + urlencode(fragments)
return url
def _do_request(self, url, method="GET", reqdata=None):
headers = {}
if reqdata:
headers["content-type"] = "application/atom+xml;type=entry"
resp = requests.request(method, url, data=reqdata, headers=headers, auth=self.auth, verify=False)
data = resp.content
if not data:
return None
if resp.status_code >= 300:
error_doc = etree.fromstring(data)
errtype = ", ".join(error_doc.xpath('/m:error/m:innererror/m:type/text()', namespaces=namespaces))
errmsg = ", ".join(error_doc.xpath('/m:error/m:innererror/m:message/text()', namespaces=namespaces))
if not errmsg:
errmsg = error_doc.xpath('/m:error/m:message/text()', namespaces=namespaces)[0]
print("Request failed with error: " + errtype)
raise ODataException(errtype, errmsg, error_doc)
except etree.XMLSyntaxError as e2:
print("Did not get xml error from SPF")
raise e2
except AttributeError as e2:
return etree.fromstring(data)
except etree.XMLSyntaxError as e:
print("Content that caused the issue:")
raise e
def get_collections(self):
url = self._format_url(resource="")
collections = self._do_request(url)
return (collection for collection in collections.xpath('/b:service/b:workspace/b:collection/atom:title/text()', namespaces=namespaces))
def query_collection(self, collection, query=None, **kw_args):
url = self._format_url(collection, query, kw_args)
collection = self._do_request(url)
result = []
for entry in collection.xpath('/atom:feed/atom:entry', namespaces=namespaces):
result.append(Entry(entry, self))
return result
def get_from_collection(self, collection, subresource=None, **kw_args):
url = self._format_url(collection, query=None, args=kw_args, subresource=subresource)
collection = self._do_request(url)
for entry in collection.xpath('/atom:entry', namespaces=namespaces):
return Entry(entry, self)
return None
def delete_from_collection(self, collection, subresource=None, **kw_args):
url = self._format_url(collection, query=None, args=kw_args, subresource=subresource)
self._do_request(url, method="DELETE")
def delete_resource(self, res):
url = self.baseurl + "/" + res
self._do_request(url, method="DELETE")
def get_resource(self, url):
repl = self._do_request(self.baseurl + "/" + url)
if repl is None:
return []
if repl.tag == ns('atom', 'feed'):
return [Entry(entry, self) for entry in repl.xpath('/atom:feed/atom:entry', namespaces=namespaces)]
return Entry(repl, self)
def create_link(self, links_collection, target):
url = self.baseurl + "/" + links_collection
entry, obj = create_entry()
add_property(obj, "ID", target)
res = self._do_request(url, "POST", etree.tostring(entry))
return res
class Entry(object):
def __init__(self, element, vmm_client):
self.vmm_client = vmm_client
self.element = element = self.parse_properties(element)
self.resources = self.parse_resources(element)
def get_resource(self, resource):
return self.vmm_client.get_resource(self.resources[resource])
def parse_resources(self, element):
resources = {}
for link in element.xpath("atom:link", namespaces=namespaces):
rel = link.get('rel')
if rel == "edit":
title = link.get('title')
href = link.get('href')
resources[title] = href
return resources
def parse_property(self, prop):
inttypes = set(("Edm.Int32", "Edm.Int64", "Edm.Byte", "Edm.Int16"))
collectiontypes = set(("VMM.ErrorInfo", "VMM.UserAndRole", "VMM.VPNVMNetworkGateway"))
if prop.get(ns("m", "null")) == "true":
return None
property_type = prop.get(ns("m", "type"))
if not property_type:
return prop.text
if property_type == "Edm.Boolean":
if prop.text == "true":
return True
return False
if property_type in inttypes:
return int(prop.text)
if property_type == "Edm.Guid":
return uuid.UUID(prop.text)
if property_type == "Edm.DateTime":
return tsparse(prop.text)
if property_type in collectiontypes or property_type.startswith("Collection("):
return self.parse_properties(prop, "*")
def parse_properties(self, prop, xpath="atom:content/m:properties/*"):
element = {}
for tag in prop.xpath(xpath, namespaces=namespaces):
(namespace, tagname) = namespace_split(tag.tag)
if namespace == namespaces['ds']:
key = tagname
key = tag.tag
element[key] = self.parse_property(tag)
return element
def create_entry():
entry = etree.Element(ns('atom', 'entry'))
content = etree.SubElement(entry, ns('atom', 'content'))
content.set("type", "application/xml")
vm = etree.SubElement(content, ns('m', "properties"))
return entry, vm
def add_property(tag, key, value, datatype=None):
prop = etree.SubElement(tag, ns('ds', key))
if type(value) == uuid.UUID:
prop.set(ns('m', 'type'), "Edm.Guid")
prop.text = str(value)
elif type(value) == bool:
prop.set(ns('m', 'type'), "Edm.Boolean")
if value:
prop.text = "true"
prop.text = "false"
elif type(value) == int:
prop.set(ns('m', 'type'), "Edm.Int64")
prop.text = value
prop.text = value
if datatype:
prop.set(ns('m', 'type'), datatype)
return prop
class VMMClient(Client):
def create_vm(self, StampId, Name, CloudId, **kwargs):
entry, vm = create_entry()
add_property(vm, "StampId", StampId)
add_property(vm, "CloudId", CloudId)
add_property(vm, "Name", Name)
add_property(vm, "NewVirtualNetworkAdapterInput", "", datatype="Collection(VMM.NewVMVirtualNetworkAdapterInput)")
for key, value in kwargs.items():
if value == "":
print("skipping empty value")
add_property(vm, key, value)
# print(etree.tostring(entry, pretty_print=True))
result = Entry(self._do_request(self._format_url("VirtualMachines"), method="POST", reqdata=etree.tostring(entry)), self)
return result
def get_vm(self, stampid, vmid):
return self.get_from_collection("VirtualMachines", StampId=stampid, ID=vmid)
def delete_vm(self, stampid, vmid):
self.delete_from_collection("VirtualMachines", StampId=stampid, ID=vmid)
def vm_operation(self, stampid, vmid, operation):
url = self._format_url("VirtualMachines", args={"StampId": stampid, "ID": vmid})
entry, vm = create_entry()
add_property(vm, "StampId", stampid)
add_property(vm, "ID", vmid)
add_property(vm, "Operation", operation)
self._do_request(url, method="PATCH", reqdata=etree.tostring(entry))
def get_vm_addresses(self, stampid, vmid):
addresses = []
guest_infos = self.query_collection("GuestInfos", query={"StampId": stampid, "VMId": vmid})
for guest_info in guest_infos:
addresses +=["IPv4Addresses"].split(";")
addresses = [a for a in addresses if a]
addresses = [a for a in addresses if a != ""]
return addresses
def get_job(self, stampid, jobid):
return self.get_from_collection("Jobs", StampId=stampid, ID=jobid)
def get_config():
cfg = SafeConfigParser()"nova.cfg")
return cfg
NEEDS_STAMPID_VMID = ("GuestInfos", "PerformanceData", )
NEED_MORE = ("QuotaAndUsageComponents",)
# encoding: utf-8
from iaas_portal import odata
import uuid
class TestOData(object):
def setup(self):
def teardown(self):
def test_ns_split(self):
""" Test splitting tag and namespace, normal flow """
tag = "{}mytag"
namespace, tag = odata.namespace_split(tag)
assert namespace == ""
assert tag == "mytag"
def test_ns_split_no_namespace(self):
""" Test splitting tag and namespace when no namespace is provided """
tag = "mytag"
namespace, res_tag = odata.namespace_split(tag)
assert namespace == ""
assert res_tag == tag
def test_ns(self):
""" test constructing namespaced tag names """
assert odata.ns("b", "mytag") == "{}mytag"
assert odata.ns("atom", "mytag2") == "{}mytag2"
assert odata.ns("ds", "sometag") == "{}sometag"
assert odata.ns("m", "mytag") == "{}mytag"
def test_type_encode_argvalue(self):
uuid_string = '00010203-0405-0607-0809-0a0b0c0d0e0f'
guid = uuid.UUID(uuid_string)
assert odata.type_encode_argvalue(guid) == "guid'" + uuid_string + "'"
assert odata.type_encode_argvalue("mystring") == "'mystring'"
# assert odata.type_encode_argvalue(u"blåbærsyltetøy") = u"'blåbærsyltetøy'"
assert odata.type_encode_argvalue(123) == 123
class TestODataClient(object):
def setup(self):
self.client = odata.Client("http://someurl", "myusername", "mytopsecretpassword")
def teardown(self):
def test_format_url(self):
""" Test the format url call """
assert self.client._format_url("foo") == "http://someurl/foo"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment