Fixing and refactoring of is directory resource methods
This commit is contained in:
parent
5a397dec9b
commit
378afe5890
3 changed files with 78 additions and 81 deletions
|
@ -7,6 +7,10 @@ But uses `requests` instead of `PyCURL`
|
|||
|
||||
Release Notes
|
||||
=============
|
||||
Version 0.4 - TBD
|
||||
* Refactoring of WebDAV client and making it works in following methods:
|
||||
- Checking is remote resource directory
|
||||
|
||||
Version 0.3 - 18.10.2017
|
||||
* Refactoring of WebDAV client and making it works in following methods:
|
||||
- Getting of WebDAV resource property value
|
||||
|
|
2
setup.py
2
setup.py
|
@ -55,7 +55,7 @@ setup(
|
|||
long_description=open('README.rst').read(),
|
||||
author='Evgeny Ezhov',
|
||||
author_email='ezhov.evgeny@gmail.com',
|
||||
url='https://github.com/ezhov-evgeny/webdav-client-python-2',
|
||||
url='https://github.com/ezhov-evgeny/webdav-client-python-3',
|
||||
license='MIT License',
|
||||
keywords='webdav, client, python, module, library, packet, Yandex.Disk, Dropbox, Google Disk, Box, 4shared',
|
||||
classifiers=[
|
||||
|
|
|
@ -131,6 +131,14 @@ class Client(object):
|
|||
url = {'hostname': self.webdav.hostname, 'root': self.webdav.root, 'path': path}
|
||||
return "{hostname}{root}{path}".format(**url)
|
||||
|
||||
def get_full_path(self, urn):
|
||||
"""Generates full path to remote resource exclude hostname.
|
||||
|
||||
:param urn: the URN to resource.
|
||||
:return: full path to resource with root path.
|
||||
"""
|
||||
return "{root}{path}".format(root=self.webdav.root, path=urn.path())
|
||||
|
||||
def execute_request(self, action, path, data=None, headers_ext=None):
|
||||
"""Generate request to WebDAV server for specified action and path and execute it.
|
||||
|
||||
|
@ -232,7 +240,7 @@ class Client(object):
|
|||
response = self.execute_request(action='list', path=directory_urn.quote())
|
||||
urns = WebDavXmlUtils.parse_get_list_response(response.content)
|
||||
|
||||
path = "{root}{path}".format(root=self.webdav.root, path=directory_urn.path())
|
||||
path = self.get_full_path(directory_urn)
|
||||
return [urn.filename() for urn in urns if urn.path() != path and urn.path() != path[:-1]]
|
||||
|
||||
@wrap_connection_error
|
||||
|
@ -507,7 +515,7 @@ class Client(object):
|
|||
if not self.check(urn_to.parent()):
|
||||
raise RemoteParentNotFound(urn_to.path())
|
||||
|
||||
header_destination = "Destination: {root}{path}".format(root=self.webdav.root, path=urn_to.path())
|
||||
header_destination = "Destination: {path}".format(path=self.get_full_path(urn_to))
|
||||
self.execute_request(action='copy', path=urn_from.quote(), headers_ext=[header_destination])
|
||||
|
||||
@wrap_connection_error
|
||||
|
@ -527,7 +535,7 @@ class Client(object):
|
|||
if not self.check(urn_to.parent()):
|
||||
raise RemoteParentNotFound(urn_to.path())
|
||||
|
||||
header_destination = "Destination: {root}{path}".format(root=self.webdav.root, path=urn_to.path())
|
||||
header_destination = "Destination: {path}".format(path=self.get_full_path(urn_to))
|
||||
header_overwrite = "Overwrite: {flag}".format(flag="T" if overwrite else "F")
|
||||
self.execute_request(action='move', path=urn_from.quote(), headers_ext=[header_destination, header_overwrite])
|
||||
|
||||
|
@ -559,67 +567,26 @@ class Client(object):
|
|||
raise RemoteResourceNotFound(remote_path)
|
||||
|
||||
response = self.execute_request(action='info', path=urn.quote())
|
||||
path = "{root}{path}".format(root=self.webdav.root, path=urn.path())
|
||||
path = self.get_full_path(urn)
|
||||
return WebDavXmlUtils.parse_info_response(content=response.content, path=path, hostname=self.webdav.hostname)
|
||||
|
||||
# TODO refactor code below and write tests for it.
|
||||
@wrap_connection_error
|
||||
def is_dir(self, remote_path):
|
||||
"""Checks is the remote resource directory.
|
||||
More information you can find by link http://webdav.org/specs/rfc4918.html#METHOD_PROPFIND
|
||||
|
||||
def parse(response, path):
|
||||
|
||||
try:
|
||||
response_str = response.content
|
||||
tree = etree.fromstring(response_str)
|
||||
|
||||
resps = tree.findall("{DAV:}response")
|
||||
|
||||
for resp in resps:
|
||||
href = resp.findtext("{DAV:}href")
|
||||
urn = unquote(href)
|
||||
|
||||
if path[-1] == Urn.separate:
|
||||
if not path == urn:
|
||||
continue
|
||||
else:
|
||||
path_with_sep = "{path}{sep}".format(path=path, sep=Urn.separate)
|
||||
if not path == urn and not path_with_sep == urn:
|
||||
continue
|
||||
type = resp.find(".//{DAV:}resourcetype")
|
||||
if type is None:
|
||||
raise MethodNotSupported(name="is_dir", server=self.webdav.hostname)
|
||||
dir_type = type.find("{DAV:}collection")
|
||||
|
||||
return True if dir_type is not None else False
|
||||
|
||||
raise RemoteResourceNotFound(path)
|
||||
|
||||
except etree.XMLSyntaxError:
|
||||
raise MethodNotSupported(name="is_dir", server=self.webdav.hostname)
|
||||
|
||||
:param remote_path: the path to remote resource.
|
||||
:return: True in case the remote resource is directory and False otherwise.
|
||||
"""
|
||||
urn = Urn(remote_path)
|
||||
parent_urn = Urn(urn.parent())
|
||||
if not self.check(urn.path()) and not self.check(Urn(remote_path, directory=True).path()):
|
||||
raise RemoteResourceNotFound(remote_path)
|
||||
|
||||
response = BytesIO()
|
||||
|
||||
url = {'hostname': self.webdav.hostname, 'root': self.webdav.root, 'path': parent_urn.quote()}
|
||||
options = {
|
||||
'URL': "{hostname}{root}{path}".format(**url),
|
||||
'CUSTOMREQUEST': Client.requests['info'],
|
||||
}
|
||||
|
||||
response = requests.request(
|
||||
options["CUSTOMREQUEST"],
|
||||
options["URL"],
|
||||
auth=(self.webdav.login, self.webdav.password),
|
||||
headers=self.get_headers('info', None)
|
||||
)
|
||||
|
||||
path = "{root}{path}".format(root=self.webdav.root, path=urn.path())
|
||||
|
||||
return parse(response, path)
|
||||
response = self.execute_request(action='info', path=parent_urn.quote())
|
||||
path = self.get_full_path(urn)
|
||||
return WebDavXmlUtils.parse_is_dir_response(content=response.content, path=path, hostname=self.webdav.hostname)
|
||||
|
||||
@wrap_connection_error
|
||||
def get_property(self, remote_path, option):
|
||||
|
@ -880,36 +847,34 @@ class WebDavXmlUtils:
|
|||
`size`: size of resource,
|
||||
`modified`: date of resource modification.
|
||||
"""
|
||||
try:
|
||||
tree = etree.fromstring(content)
|
||||
find_attributes = {
|
||||
'created': ".//{DAV:}creationdate",
|
||||
'name': ".//{DAV:}displayname",
|
||||
'size': ".//{DAV:}getcontentlength",
|
||||
'modified': ".//{DAV:}getlastmodified"
|
||||
}
|
||||
response = WebDavXmlUtils.extract_response_for_path(content=content, path=path, hostname=hostname)
|
||||
find_attributes = {
|
||||
'created': ".//{DAV:}creationdate",
|
||||
'name': ".//{DAV:}displayname",
|
||||
'size': ".//{DAV:}getcontentlength",
|
||||
'modified': ".//{DAV:}getlastmodified"
|
||||
}
|
||||
info = dict()
|
||||
for (name, value) in find_attributes.items():
|
||||
info[name] = response.findtext(value)
|
||||
return info
|
||||
|
||||
responses = tree.findall("{DAV:}response")
|
||||
for resp in responses:
|
||||
href = resp.findtext("{DAV:}href")
|
||||
urn = unquote(href)
|
||||
@staticmethod
|
||||
def parse_is_dir_response(content, path, hostname):
|
||||
"""Parses of response content XML from WebDAV server and extract an information about resource.
|
||||
|
||||
if path[-1] == Urn.separate:
|
||||
if not path == urn:
|
||||
continue
|
||||
else:
|
||||
path_with_sep = "{path}{sep}".format(path=path, sep=Urn.separate)
|
||||
if not path == urn and not path_with_sep == urn:
|
||||
continue
|
||||
:param content: the XML content of HTTP response from WebDAV server.
|
||||
:param path: the path to resource.
|
||||
:param hostname: the server hostname.
|
||||
:return: True in case the remote resource is directory and False otherwise.
|
||||
"""
|
||||
response = WebDavXmlUtils.extract_response_for_path(content=content, path=path, hostname=hostname)
|
||||
type = response.find(".//{DAV:}resourcetype")
|
||||
if type is None:
|
||||
raise MethodNotSupported(name="is_dir", server=self.webdav.hostname)
|
||||
dir_type = type.find("{DAV:}collection")
|
||||
|
||||
info = dict()
|
||||
for (name, value) in find_attributes.items():
|
||||
info[name] = resp.findtext(value)
|
||||
return info
|
||||
|
||||
raise RemoteResourceNotFound(path)
|
||||
except etree.XMLSyntaxError:
|
||||
raise MethodNotSupported(name="info", server=hostname)
|
||||
return True if dir_type is not None else False
|
||||
|
||||
@staticmethod
|
||||
def create_get_property_request_content(option):
|
||||
|
@ -965,3 +930,31 @@ class WebDavXmlUtils:
|
|||
buff = BytesIO()
|
||||
tree.write(buff, xml_declaration=True, encoding='UTF-8')
|
||||
return buff.getvalue()
|
||||
|
||||
@staticmethod
|
||||
def extract_response_for_path(content, path, hostname):
|
||||
"""Extracts single response for specified remote resource.
|
||||
|
||||
:param content: raw content of response as string.
|
||||
:param path: the path to needed remote resource.
|
||||
:param hostname: the server hostname.
|
||||
:return: XML object of response for the remote resource defined by path.
|
||||
"""
|
||||
try:
|
||||
tree = etree.fromstring(content)
|
||||
responses = tree.findall("{DAV:}response")
|
||||
for resp in responses:
|
||||
href = resp.findtext("{DAV:}href")
|
||||
urn = unquote(href)
|
||||
|
||||
if path[-1] == Urn.separate:
|
||||
if not path == urn:
|
||||
continue
|
||||
else:
|
||||
path_with_sep = "{path}{sep}".format(path=path, sep=Urn.separate)
|
||||
if not path == urn and not path_with_sep == urn:
|
||||
continue
|
||||
return resp
|
||||
raise RemoteResourceNotFound(path)
|
||||
except etree.XMLSyntaxError:
|
||||
raise MethodNotSupported(name="is_dir", server=hostname)
|
||||
|
|
Loading…
Add table
Reference in a new issue