Compare commits

..

No commits in common. "develop" and "v0.14" have entirely different histories.

12 changed files with 93 additions and 291 deletions

View file

@ -4,16 +4,15 @@ dist: xenial
addons:
sonarcloud:
organization: "ezhov-evgeny"
token: f0f714f3bea6bd103e3eb82724ef3bb0d3b54d1d
token:
secure: f0f714f3bea6bd103e3eb82724ef3bb0d3b54d1d
services:
- docker
python:
- "3.5"
- "3.6"
- "2.7"
- "3.7"
- "3.8"
before_install:
- docker pull bytemark/webdav
@ -26,7 +25,4 @@ install:
script:
- coverage run setup.py test
- coverage xml
- |
if [[ $TRAVIS_PYTHON_VERSION == "3.8" ]]; then
sonar-scanner
fi
- sonar-scanner

View file

@ -33,12 +33,12 @@ client.execute_request("mkdir", 'directory_name')
Webdav API
==========
Webdav API is a set of webdav actions of work with cloud storage. This set includes the following actions:
Webdav API is a set of webdav methods of work with cloud storage. This set includes the following methods:
`check`, `free`, `info`, `list`, `mkdir`, `clean`, `copy`, `move`, `download`, `upload`, `publish` and `unpublish`.
**Configuring the client**
Required keys for configuring client connection with WevDAV-server are `webdav_hostname` and `webdav_login`, `webdav_password`.
Required keys for configuring client connection with WevDAV-server are webdav\_hostname and webdav\_login, webdav\_password.
```python
from webdav3.client import Client
@ -51,25 +51,6 @@ options = {
client = Client(options)
```
If your server does not support `HEAD` method or there are other reasons to override default WebDAV methods for actions use a dictionary option `webdav_override_methods`.
The key should be in the following list: `check`, `free`, `info`, `list`, `mkdir`, `clean`, `copy`, `move`, `download`, `upload`,
`publish` and `unpublish`. The value should a string name of WebDAV method, for example `GET`.
```python
from webdav3.client import Client
options = {
'webdav_hostname': "https://webdav.server.ru",
'webdav_login': "login",
'webdav_password': "password",
'webdav_override_methods': {
'check': 'GET'
}
}
client = Client(options)
```
When a proxy server you need to specify settings to connect through it.
```python
@ -298,16 +279,7 @@ res1.write_async(local_path="~/Downloads/file1", callback)
Release Notes
-------------
**Version 3.14.1**
* Fixed issue during coping and moving files with cyrillic names
* Support OAuth2 bearer tokens by https://github.com/danielloader
**Version 3.14**
* Override methods for customizing communication with WebDAV servers
* Support multiple clients simultaneously
* Sync modified files during pull and push by https://github.com/mont5piques
**Version 0.14**
**Version 0.14 TBD**
* Fixed an issue with checking resources on Yandex WebDAV server
**Version 0.13 27.11.2019**

View file

@ -6,7 +6,7 @@ from setuptools import setup, find_packages
from setuptools.command.install import install as InstallCommand
from setuptools.command.test import test as TestCommand
version = "3.14.1"
version = "0.14"
requirements = "libxml2-dev libxslt-dev python-dev"
@ -51,8 +51,8 @@ setup(
name='webdavclient3',
version=version,
packages=find_packages(),
requires=['python (>= 3.3.0)'],
install_requires=['requests', 'lxml', 'python-dateutil'],
requires=['python (>= 2.7.6)'],
install_requires=['requests', 'lxml', 'argcomplete'],
scripts=['wdc'],
test_suite='tests',
tests_require=['pytest'],
@ -73,10 +73,8 @@ setup(
'Operating System :: MacOS',
'Operating System :: Microsoft',
'Operating System :: Unix',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Topic :: Internet',
'Topic :: Software Development :: Libraries :: Python Modules',
],

View file

@ -13,7 +13,6 @@ class BaseClientTestCase(unittest.TestCase):
remote_path_dir = 'test_dir'
remote_path_dir2 = 'test_dir2'
remote_inner_path_dir = 'test_dir/inner'
inner_dir_name = 'inner'
local_base_dir = 'tests/'
local_file = 'test.txt'
local_file_path = local_base_dir + 'test.txt'
@ -22,10 +21,7 @@ class BaseClientTestCase(unittest.TestCase):
options = {
'webdav_hostname': 'http://localhost:8585',
'webdav_login': 'alice',
'webdav_password': 'secret1234',
'webdav_override_methods': {
'check': 'GET'
}
'webdav_password': 'secret1234'
}
# options = {
@ -36,39 +32,29 @@ class BaseClientTestCase(unittest.TestCase):
def setUp(self):
self.client = Client(self.options)
self.clean_local_dir(self.local_path_dir)
if path.exists(path=self.local_path_dir):
shutil.rmtree(path=self.local_path_dir)
def tearDown(self):
self.clean_local_dir(self.local_path_dir)
self.clean_remote_dir(self.remote_path_dir)
self.clean_remote_dir(self.remote_path_dir2)
if path.exists(path=self.local_path_dir):
shutil.rmtree(path=self.local_path_dir)
if self.client.check(remote_path=self.remote_path_dir):
self.client.clean(remote_path=self.remote_path_dir)
if self.client.check(remote_path=self.remote_path_dir2):
self.client.clean(remote_path=self.remote_path_dir2)
def clean_remote_dir(self, remote_path_dir):
if self.client.check(remote_path=remote_path_dir):
self.client.clean(remote_path=remote_path_dir)
@staticmethod
def clean_local_dir(local_path_dir):
if path.exists(path=local_path_dir):
shutil.rmtree(path=local_path_dir)
def _prepare_for_downloading(self, inner_dir=False, base_path=''):
if base_path:
self._create_remote_dir_if_needed(base_path)
self._prepare_dir_for_downloading(base_path + self.remote_path_dir, base_path + self.remote_path_file, self.local_file_path)
def _prepare_for_downloading(self, inner_dir=False):
if not self.client.check(remote_path=self.remote_path_dir):
self.client.mkdir(remote_path=self.remote_path_dir)
if not self.client.check(remote_path=self.remote_path_file):
self.client.upload_file(remote_path=self.remote_path_file, local_path=self.local_file_path)
if not path.exists(self.local_path_dir):
os.makedirs(self.local_path_dir)
if inner_dir:
self._prepare_dir_for_downloading(base_path + self.remote_inner_path_dir, base_path + self.remote_inner_path_file, self.local_file_path)
def _prepare_dir_for_downloading(self, remote_path_dir, remote_path_file, local_file_path):
self._create_remote_dir_if_needed(remote_path_dir)
if not self.client.check(remote_path=remote_path_file):
self.client.upload_file(remote_path=remote_path_file, local_path=local_file_path)
def _create_remote_dir_if_needed(self, remote_dir):
if not self.client.check(remote_path=remote_dir):
self.client.mkdir(remote_path=remote_dir)
if not self.client.check(remote_path=self.remote_inner_path_dir):
self.client.mkdir(remote_path=self.remote_inner_path_dir)
if not self.client.check(remote_path=self.remote_inner_path_file):
self.client.upload_file(remote_path=self.remote_inner_path_file, local_path=self.local_file_path)
def _prepare_for_uploading(self):
if not self.client.check(remote_path=self.remote_path_dir):

View file

@ -1,5 +1,4 @@
import os.path
import shutil
import unittest
from io import BytesIO, StringIO
from os import path
@ -10,7 +9,6 @@ from webdav3.exceptions import MethodNotSupported, OptionNotValid, RemoteResourc
class ClientTestCase(BaseClientTestCase):
pulled_file = BaseClientTestCase.local_path_dir + os.sep + BaseClientTestCase.local_file
def test_list(self):
self._prepare_for_downloading()
@ -208,10 +206,10 @@ class ClientTestCase(BaseClientTestCase):
self._prepare_for_downloading(True)
self.client.pull(self.remote_path_dir, self.local_path_dir)
self.assertTrue(path.exists(self.local_path_dir), 'Expected the directory is downloaded.')
self.assertTrue(path.exists(self.local_path_dir + os.path.sep + self.inner_dir_name), 'Expected the directory is downloaded.')
self.assertTrue(path.exists(self.local_path_dir + os.path.sep + self.inner_dir_name), 'Expected the directory is downloaded.')
self.assertTrue(path.exists(self.local_path_dir + os.path.sep + 'inner'), 'Expected the directory is downloaded.')
self.assertTrue(path.exists(self.local_path_dir + os.path.sep + 'inner'), 'Expected the directory is downloaded.')
self.assertTrue(path.isdir(self.local_path_dir), 'Expected this is a directory.')
self.assertTrue(path.isdir(self.local_path_dir + os.path.sep + self.inner_dir_name), 'Expected this is a directory.')
self.assertTrue(path.isdir(self.local_path_dir + os.path.sep + 'inner'), 'Expected this is a directory.')
self.assertTrue(path.exists(self.local_path_dir + os.path.sep + self.local_file),
'Expected the file is downloaded')
self.assertTrue(path.isfile(self.local_path_dir + os.path.sep + self.local_file),
@ -226,58 +224,6 @@ class ClientTestCase(BaseClientTestCase):
def test_valid(self):
self.assertTrue(self.client.valid())
def test_check_is_overridden(self):
self.assertEqual('GET', self.client.requests['check'])
def test_pull_newer(self):
init_modification_time = int(self._prepare_local_test_file_and_get_modification_time())
sleep(1)
self._prepare_for_downloading(base_path='time/')
result = self.client.pull('time/' + self.remote_path_dir, self.local_path_dir)
update_modification_time = int(os.path.getmtime(self.pulled_file))
self.assertTrue(result)
self.assertGreater(update_modification_time, init_modification_time)
self.client.clean(remote_path='time/' + self.remote_path_dir)
def test_pull_older(self):
self._prepare_for_downloading(base_path='time/')
sleep(1)
init_modification_time = int(self._prepare_local_test_file_and_get_modification_time())
result = self.client.pull('time/' + self.remote_path_dir, self.local_path_dir)
update_modification_time = int(os.path.getmtime(self.pulled_file))
self.assertFalse(result)
self.assertEqual(update_modification_time, init_modification_time)
self.client.clean(remote_path='time/' + self.remote_path_dir)
def test_push_newer(self):
self._prepare_for_downloading(base_path='time/')
sleep(1)
self._prepare_for_uploading()
init_modification_time = self.client.info('time/' + self.remote_path_file)['modified']
result = self.client.push('time/' + self.remote_path_dir, self.local_path_dir)
update_modification_time = self.client.info('time/' + self.remote_path_file)['modified']
self.assertTrue(result)
self.assertNotEqual(init_modification_time, update_modification_time)
self.client.clean(remote_path='time/' + self.remote_path_dir)
def test_push_older(self):
self._prepare_for_uploading()
sleep(1)
self._prepare_for_downloading(base_path='time/')
init_modification_time = self.client.info('time/' + self.remote_path_file)['modified']
result = self.client.push('time/' + self.remote_path_dir, self.local_path_dir)
update_modification_time = self.client.info('time/' + self.remote_path_file)['modified']
self.assertFalse(result)
self.assertEqual(init_modification_time, update_modification_time)
self.client.clean(remote_path='time/' + self.remote_path_dir)
def _prepare_local_test_file_and_get_modification_time(self):
if not path.exists(path=self.local_path_dir):
os.mkdir(self.local_path_dir)
if not path.exists(path=self.local_path_dir + os.sep + self.local_file):
shutil.copy(src=self.local_file_path, dst=self.pulled_file)
return os.path.getmtime(self.pulled_file)
if __name__ == '__main__':
unittest.main()

View file

@ -1,23 +0,0 @@
import os
import unittest
from tests.test_client_it import ClientTestCase
class MultiClientTestCase(ClientTestCase):
remote_path_file = 'директория/тестовый.txt'
remote_path_file2 = 'директория/тестовый2.txt'
remote_inner_path_file = 'директория/вложенная/тестовый.txt'
remote_path_dir = 'директория'
remote_path_dir2 = 'директория2'
remote_inner_path_dir = 'директория/вложенная'
inner_dir_name = 'вложенная'
local_base_dir = 'tests/'
local_file = 'тестовый.txt'
local_file_path = local_base_dir + 'тестовый.txt'
local_path_dir = local_base_dir + 'res/директория'
pulled_file = local_path_dir + os.sep + local_file
if __name__ == '__main__':
unittest.main()

View file

@ -1,26 +0,0 @@
import unittest
from tests.test_client_it import ClientTestCase
from webdav3.client import Client
class MultiClientTestCase(ClientTestCase):
options2 = {
'webdav_hostname': 'https://wrong.url.ru',
'webdav_login': 'webdavclient.test2',
'webdav_password': 'Qwerty123!',
'webdav_override_methods': {
'check': 'FAKE',
'download': 'FAKE',
'upload': 'FAKE',
'clean': 'FAKE',
}
}
def setUp(self):
super(ClientTestCase, self).setUp()
self.second_client = Client(self.options2)
if __name__ == '__main__':
unittest.main()

View file

@ -1,26 +0,0 @@
import unittest
from tests.test_client_it import ClientTestCase
class TailingSlashClientTestCase(ClientTestCase):
options = {
'webdav_hostname': 'http://localhost:8585/',
'webdav_login': 'alice',
'webdav_password': 'secret1234',
'webdav_override_methods': {
'check': 'GET'
}
}
def test_list_inner(self):
self._prepare_for_downloading(True)
file_list = self.client.list(self.remote_inner_path_dir)
self.assertIsNotNone(file_list, 'List of files should not be None')
def test_hostname_no_tailing_slash(self):
self.assertEqual('5', self.client.webdav.hostname[-1])
if __name__ == '__main__':
unittest.main()

View file

@ -1 +0,0 @@
test content for testing of webdav client

View file

@ -10,7 +10,6 @@ from re import sub
import lxml.etree as etree
import requests
from dateutil import parser as dateutil_parser
from webdav3.connection import *
from webdav3.exceptions import *
@ -90,8 +89,11 @@ class Client(object):
# controls whether to verify the server's TLS certificate or not
verify = True
# Sets the session for subsequent requests
session = requests.Session()
# HTTP headers for different actions
default_http_header = {
http_header = {
'list': ["Accept: */*", "Depth: 1"],
'free': ["Accept: */*", "Depth: 0", "Content-Type: text/xml"],
'copy': ["Accept: */*"],
@ -105,7 +107,7 @@ class Client(object):
}
# mapping of actions to WebDAV methods
default_requests = {
requests = {
'options': 'OPTIONS',
'download': "GET",
'upload': "PUT",
@ -148,13 +150,13 @@ class Client(object):
`webdav_verbose`: (optional) set verbose mode on.off. By default verbose mode is off.
"""
self.session = requests.Session()
self.http_header = Client.default_http_header.copy()
self.requests = Client.default_requests.copy()
webdav_options = get_options(option_type=WebDAVSettings, from_options=options)
self.webdav = WebDAVSettings(webdav_options)
self.requests.update(self.webdav.override_methods)
response = self.execute_request('options', '')
self.supported_methods = response.headers.get('Allow')
if 'HEAD' not in self.supported_methods:
self.requests['check'] = 'GET'
self.default_options = {}
def get_headers(self, action, headers_ext=None):
@ -165,11 +167,11 @@ class Client(object):
the specified action.
:return: the dictionary of headers for specified action.
"""
if action in self.http_header:
if action in Client.http_header:
try:
headers = self.http_header[action].copy()
headers = Client.http_header[action].copy()
except AttributeError:
headers = self.http_header[action][:]
headers = Client.http_header[action][:]
else:
headers = list()
@ -177,7 +179,7 @@ class Client(object):
headers.extend(headers_ext)
if self.webdav.token:
webdav_token = "Authorization: Bearer {token}".format(token=self.webdav.token)
webdav_token = "Authorization: OAuth {token}".format(token=self.webdav.token)
headers.append(webdav_token)
return dict([map(lambda s: s.strip(), i.split(':', 1)) for i in headers])
@ -212,12 +214,11 @@ class Client(object):
if self.session.auth:
self.session.request(method="GET", url=self.webdav.hostname, verify=self.verify) # (Re)Authenticates against the proxy
response = self.session.request(
method=self.requests[action],
method=Client.requests[action],
url=self.get_url(path),
auth=(self.webdav.login, self.webdav.password) if (not self.webdav.token and not self.session.auth) else None,
auth=(self.webdav.login, self.webdav.password),
headers=self.get_headers(action, headers_ext),
timeout=self.timeout,
cert=(self.webdav.cert_path, self.webdav.key_path) if (self.webdav.cert_path and self.webdav.key_path) else None,
data=data,
stream=True,
verify=self.verify
@ -536,7 +537,7 @@ class Client(object):
raise RemoteParentNotFound(urn_to.path())
headers = [
"Destination: {url}".format(url=self.get_url(urn_to.quote()))
"Destination: {url}".format(url=self.get_url(urn_to))
]
if self.is_dir(urn_from.path()):
headers.append("Depth: {depth}".format(depth=depth))
@ -559,7 +560,7 @@ class Client(object):
if not self.check(urn_to.parent()):
raise RemoteParentNotFound(urn_to.path())
header_destination = "Destination: {path}".format(path=self.get_url(urn_to.quote()))
header_destination = "Destination: {path}".format(path=self.get_url(urn_to))
header_overwrite = "Overwrite: {flag}".format(flag="T" if overwrite else "F")
self.execute_request(action='move', path=urn_from.quote(), headers_ext=[header_destination, header_overwrite])
@ -672,39 +673,50 @@ class Client(object):
def prune(src, exp):
return [sub(exp, "", item) for item in src]
updated = False
urn = Urn(remote_directory, directory=True)
self._validate_remote_directory(urn)
self._validate_local_directory(local_directory)
if not self.is_dir(urn.path()):
raise OptionNotValid(name="remote_path", value=remote_directory)
if not os.path.isdir(local_directory):
raise OptionNotValid(name="local_path", value=local_directory)
if not os.path.exists(local_directory):
raise LocalResourceNotFound(local_directory)
paths = self.list(urn.path())
expression = "{begin}{end}".format(begin="^", end=urn.path())
remote_resource_names = prune(paths, expression)
for local_resource_name in listdir(local_directory):
local_path = os.path.join(local_directory, local_resource_name)
remote_path = "{remote_directory}{resource_name}".format(remote_directory=urn.path(), resource_name=local_resource_name)
remote_path = "{remote_directory}{resource_name}".format(remote_directory=urn.path(),
resource_name=local_resource_name)
if os.path.isdir(local_path):
if not self.check(remote_path=remote_path):
self.mkdir(remote_path=remote_path)
result = self.push(remote_directory=remote_path, local_directory=local_path)
updated = updated or result
self.push(remote_directory=remote_path, local_directory=local_path)
else:
if local_resource_name in remote_resource_names and not self.is_local_more_recent(local_path, remote_path):
if local_resource_name in remote_resource_names:
continue
self.upload_file(remote_path=remote_path, local_path=local_path)
updated = True
return updated
def pull(self, remote_directory, local_directory):
def prune(src, exp):
return [sub(exp, "", item) for item in src]
updated = False
urn = Urn(remote_directory, directory=True)
self._validate_remote_directory(urn)
self._validate_local_directory(local_directory)
if not self.is_dir(urn.path()):
raise OptionNotValid(name="remote_path", value=remote_directory)
if not os.path.exists(local_directory):
raise LocalResourceNotFound(local_directory)
local_resource_names = listdir(local_directory)
@ -713,62 +725,30 @@ class Client(object):
remote_resource_names = prune(paths, expression)
for remote_resource_name in remote_resource_names:
local_path = os.path.join(local_directory, remote_resource_name)
remote_path = "{remote_directory}{resource_name}".format(remote_directory=urn.path(), resource_name=remote_resource_name)
remote_path = "{remote_directory}{resource_name}".format(remote_directory=urn.path(),
resource_name=remote_resource_name)
remote_urn = Urn(remote_path)
if remote_urn.path().endswith("/"):
if not os.path.exists(local_path):
updated = True
os.mkdir(local_path)
result = self.pull(remote_directory=remote_path, local_directory=local_path)
updated = updated or result
self.pull(remote_directory=remote_path, local_directory=local_path)
else:
if remote_resource_name in local_resource_names and self.is_local_more_recent(local_path, remote_path):
if remote_resource_name in local_resource_names:
continue
self.download_file(remote_path=remote_path, local_path=local_path)
updated = True
return updated
def is_local_more_recent(self, local_path, remote_path):
"""Tells if local resource is more recent that the remote on if possible
:param local_path: the path to local resource.
:param remote_path: the path to remote resource.
:return: True if local resource is more recent, False if the remote one is
None if comparison is not possible
"""
try:
remote_info = self.info(remote_path)
remote_last_mod_date = remote_info['modified']
remote_last_mod_date = dateutil_parser.parse(remote_last_mod_date)
remote_last_mod_date_unix_ts = int(remote_last_mod_date.timestamp())
local_last_mod_date_unix_ts = int(os.stat(local_path).st_mtime)
return remote_last_mod_date_unix_ts < local_last_mod_date_unix_ts
except ValueError or RuntimeWarning or KeyError:
# If there is problem when parsing dates, or cannot get
# last modified information, return None
return None
def sync(self, remote_directory, local_directory):
self.pull(remote_directory=remote_directory, local_directory=local_directory)
self.push(remote_directory=remote_directory, local_directory=local_directory)
def _validate_remote_directory(self, urn):
if not self.is_dir(urn.path()):
raise OptionNotValid(name="remote_path", value=urn.path())
@staticmethod
def _validate_local_directory(local_directory):
if not os.path.isdir(local_directory):
raise OptionNotValid(name="local_path", value=local_directory)
if not os.path.exists(local_directory):
raise LocalResourceNotFound(local_directory)
class Resource(object):
def __init__(self, client, urn):

View file

@ -25,22 +25,21 @@ class WebDAVSettings(ConnectionSettings):
ns = "webdav:"
prefix = "webdav_"
keys = {'hostname', 'login', 'password', 'token', 'root', 'cert_path', 'key_path', 'recv_speed', 'send_speed',
'verbose', 'disable_check', 'override_methods'}
'verbose', 'disable_check'}
hostname = None
login = None
password = None
token = None
root = None
cert_path = None
key_path = None
recv_speed = None
send_speed = None
verbose = None
disable_check = False
def __init__(self, options):
self.hostname = None
self.login = None
self.password = None
self.token = None
self.root = None
self.cert_path = None
self.key_path = None
self.recv_speed = None
self.send_speed = None
self.verbose = None
self.disable_check = False
self.override_methods = {}
self.options = dict()
for key in self.keys:
@ -50,7 +49,6 @@ class WebDAVSettings(ConnectionSettings):
self.root = Urn(self.root).quote() if self.root else ''
self.root = self.root.rstrip(Urn.separate)
self.hostname = self.hostname.rstrip(Urn.separate)
def is_valid(self):
if not self.hostname:

View file

@ -34,11 +34,13 @@ class Urn(object):
return self._path
def filename(self):
path_split = self._path.split(Urn.separate)
name = path_split[-2] + Urn.separate if path_split[-1] == '' else path_split[-1]
return unquote(name)
def parent(self):
path_split = self._path.split(Urn.separate)
nesting_level = self.nesting_level()
parent_path_split = path_split[:nesting_level]