aboutsummaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorNikolaus Rath <Nikolaus@rath.org>2017-06-20 13:53:55 -0700
committerNikolaus Rath <Nikolaus@rath.org>2017-06-22 14:22:57 -0700
commit5f4619bac33a544af4da6e7d28bb4d7dbe45ae92 (patch)
treeab79661111460b9aab6a7b9452dfcf52a59a90de /test
parentb66ecb9c3a1d4a7756ad48c61a8b2e82b08f1e8c (diff)
downloadsshfs-5f4619bac33a544af4da6e7d28bb4d7dbe45ae92.tar
sshfs-5f4619bac33a544af4da6e7d28bb4d7dbe45ae92.tar.gz
sshfs-5f4619bac33a544af4da6e7d28bb4d7dbe45ae92.tar.bz2
sshfs-5f4619bac33a544af4da6e7d28bb4d7dbe45ae92.zip
Added unit tests and travis integration
Diffstat (limited to 'test')
-rw-r--r--test/.gitignore1
-rw-r--r--test/conftest.py89
-rw-r--r--test/lsan_suppress.txt11
-rw-r--r--test/meson.build11
-rw-r--r--test/pytest.ini2
-rwxr-xr-xtest/test_sshfs.py372
-rwxr-xr-xtest/travis-build.sh48
-rwxr-xr-xtest/travis-install.sh21
-rw-r--r--test/util.py100
-rw-r--r--test/wrong_command.c9
10 files changed, 664 insertions, 0 deletions
diff --git a/test/.gitignore b/test/.gitignore
new file mode 100644
index 0000000..c18dd8d
--- /dev/null
+++ b/test/.gitignore
@@ -0,0 +1 @@
+__pycache__/
diff --git a/test/conftest.py b/test/conftest.py
new file mode 100644
index 0000000..70cd0c6
--- /dev/null
+++ b/test/conftest.py
@@ -0,0 +1,89 @@
+import sys
+import pytest
+import time
+import re
+
+# If a test fails, wait a moment before retrieving the captured
+# stdout/stderr. When using a server process, this makes sure that we capture
+# any potential output of the server that comes *after* a test has failed. For
+# example, if a request handler raises an exception, the server first signals an
+# error to FUSE (causing the test to fail), and then logs the exception. Without
+# the extra delay, the exception will go into nowhere.
+@pytest.mark.hookwrapper
+def pytest_pyfunc_call(pyfuncitem):
+ outcome = yield
+ failed = outcome.excinfo is not None
+ if failed:
+ time.sleep(1)
+
+@pytest.fixture()
+def pass_capfd(request, capfd):
+ '''Provide capfd object to UnitTest instances'''
+ request.instance.capfd = capfd
+
+def check_test_output(capfd):
+ (stdout, stderr) = capfd.readouterr()
+
+ # Write back what we've read (so that it will still be printed.
+ sys.stdout.write(stdout)
+ sys.stderr.write(stderr)
+
+ # Strip out false positives
+ for (pattern, flags, count) in capfd.false_positives:
+ cp = re.compile(pattern, flags)
+ (stdout, cnt) = cp.subn('', stdout, count=count)
+ if count == 0 or count - cnt > 0:
+ stderr = cp.sub('', stderr, count=count - cnt)
+
+ patterns = [ r'\b{}\b'.format(x) for x in
+ ('exception', 'error', 'warning', 'fatal', 'traceback',
+ 'fault', 'crash(?:ed)?', 'abort(?:ed)',
+ 'uninitiali[zs]ed') ]
+ patterns += ['^==[0-9]+== ']
+ for pattern in patterns:
+ cp = re.compile(pattern, re.IGNORECASE | re.MULTILINE)
+ hit = cp.search(stderr)
+ if hit:
+ raise AssertionError('Suspicious output to stderr (matched "%s")' % hit.group(0))
+ hit = cp.search(stdout)
+ if hit:
+ raise AssertionError('Suspicious output to stdout (matched "%s")' % hit.group(0))
+
+def register_output(self, pattern, count=1, flags=re.MULTILINE):
+ '''Register *pattern* as false positive for output checking
+
+ This prevents the test from failing because the output otherwise
+ appears suspicious.
+ '''
+
+ self.false_positives.append((pattern, flags, count))
+
+# This is a terrible hack that allows us to access the fixtures from the
+# pytest_runtest_call hook. Among a lot of other hidden assumptions, it probably
+# relies on tests running sequential (i.e., don't dare to use e.g. the xdist
+# plugin)
+current_capfd = None
+@pytest.yield_fixture(autouse=True)
+def save_cap_fixtures(request, capfd):
+ global current_capfd
+ capfd.false_positives = []
+
+ # Monkeypatch in a function to register false positives
+ type(capfd).register_output = register_output
+
+ if request.config.getoption('capture') == 'no':
+ capfd = None
+ current_capfd = capfd
+ bak = current_capfd
+ yield
+
+ # Try to catch problems with this hack (e.g. when running tests
+ # simultaneously)
+ assert bak is current_capfd
+ current_capfd = None
+
+@pytest.hookimpl(trylast=True)
+def pytest_runtest_call(item):
+ capfd = current_capfd
+ if capfd is not None:
+ check_test_output(capfd)
diff --git a/test/lsan_suppress.txt b/test/lsan_suppress.txt
new file mode 100644
index 0000000..4352b3a
--- /dev/null
+++ b/test/lsan_suppress.txt
@@ -0,0 +1,11 @@
+# Suppression file for address sanitizer.
+
+# There are some leaks in command line option parsing. They should be
+# fixed at some point, but are harmless since the consume just a small,
+# constant amount of memory and do not grow.
+leak:fuse_opt_parse
+
+
+# Leaks in fusermount3 are harmless as well (it's a short-lived
+# process) - but patches are welcome!
+leak:fusermount.c
diff --git a/test/meson.build b/test/meson.build
new file mode 100644
index 0000000..90eb94e
--- /dev/null
+++ b/test/meson.build
@@ -0,0 +1,11 @@
+test_scripts = [ 'conftest.py', 'pytest.ini', 'test_sshfs.py',
+ 'util.py' ]
+custom_target('test_scripts', input: test_scripts,
+ output: test_scripts, build_by_default: true,
+ command: ['cp', '-fPu', '--preserve=mode',
+ '@INPUT@', meson.current_build_dir() ])
+
+# Provide something helpful when running 'ninja test'
+wrong_cmd = executable('wrong_command', 'wrong_command.c',
+ install: false)
+test('wrong_cmd', wrong_cmd)
diff --git a/test/pytest.ini b/test/pytest.ini
new file mode 100644
index 0000000..9516154
--- /dev/null
+++ b/test/pytest.ini
@@ -0,0 +1,2 @@
+[pytest]
+addopts = --verbose --assert=rewrite --tb=native -x -r a
diff --git a/test/test_sshfs.py b/test/test_sshfs.py
new file mode 100755
index 0000000..e173f64
--- /dev/null
+++ b/test/test_sshfs.py
@@ -0,0 +1,372 @@
+#!/usr/bin/env python3
+
+if __name__ == '__main__':
+ import pytest
+ import sys
+ sys.exit(pytest.main([__file__] + sys.argv[1:]))
+
+import subprocess
+import os
+import sys
+import pytest
+import stat
+import shutil
+import filecmp
+import errno
+from tempfile import NamedTemporaryFile
+from util import (wait_for_mount, umount, cleanup, base_cmdline,
+ basename, fuse_test_marker, safe_sleep)
+from os.path import join as pjoin
+
+TEST_FILE = __file__
+
+pytestmark = fuse_test_marker()
+
+with open(TEST_FILE, 'rb') as fh:
+ TEST_DATA = fh.read()
+
+def name_generator(__ctr=[0]):
+ __ctr[0] += 1
+ return 'testfile_%d' % __ctr[0]
+
+@pytest.mark.parametrize("debug", (False, True))
+@pytest.mark.parametrize("cache_timeout", (0, 1))
+def test_sshfs(tmpdir, debug, cache_timeout, capfd):
+
+ # Avoid false positives from debug messages
+ #if debug:
+ # capfd.register_output(r'^ unique: [0-9]+, error: -[0-9]+ .+$',
+ # count=0)
+
+ # Test if we can ssh into localhost without password
+ try:
+ res = subprocess.call(['ssh', '-o', 'KbdInteractiveAuthentication=no',
+ '-o', 'ChallengeResponseAuthentication=no',
+ '-o', 'PasswordAuthentication=no',
+ 'localhost', '--', 'true'], stdin=subprocess.DEVNULL,
+ timeout=10)
+ except subprocess.TimeoutExpired:
+ res = 1
+ if res != 0:
+ pytest.fail('Unable to ssh into localhost without password prompt.')
+
+ mnt_dir = str(tmpdir.mkdir('mnt'))
+ src_dir = str(tmpdir.mkdir('src'))
+
+ cmdline = base_cmdline + [ pjoin(basename, 'sshfs'),
+ '-f', 'localhost:' + src_dir, mnt_dir ]
+ if debug:
+ cmdline += [ '-o', 'sshfs_debug' ]
+
+ # SSHFS Cache
+ if cache_timeout == 0:
+ cmdline += [ '-o', 'cache=no' ]
+ else:
+ cmdline += [ '-o', 'cache_timeout=%d' % cache_timeout ]
+
+ # FUSE Cache
+ cmdline += [ '-o', 'entry_timeout=0',
+ '-o', 'attr_timeout=0' ]
+
+ mount_process = subprocess.Popen(cmdline)
+ try:
+ wait_for_mount(mount_process, mnt_dir)
+
+ tst_statvfs(mnt_dir)
+ tst_readdir(src_dir, mnt_dir)
+ tst_open_read(src_dir, mnt_dir)
+ tst_open_write(src_dir, mnt_dir)
+ tst_create(mnt_dir)
+ tst_passthrough(src_dir, mnt_dir, cache_timeout)
+ tst_mkdir(mnt_dir)
+ tst_rmdir(src_dir, mnt_dir, cache_timeout)
+ tst_unlink(src_dir, mnt_dir, cache_timeout)
+ tst_symlink(mnt_dir)
+ if os.getuid() == 0:
+ tst_chown(mnt_dir)
+
+ # SSHFS only supports one second resolution when setting
+ # file timestamps.
+ tst_utimens(mnt_dir, tol=1)
+
+ tst_link(mnt_dir)
+ tst_truncate_path(mnt_dir)
+ tst_truncate_fd(mnt_dir)
+ tst_open_unlink(mnt_dir)
+ except:
+ cleanup(mnt_dir)
+ raise
+ else:
+ umount(mount_process, mnt_dir)
+
+def tst_unlink(src_dir, mnt_dir, cache_timeout):
+ name = name_generator()
+ fullname = mnt_dir + "/" + name
+ with open(pjoin(src_dir, name), 'wb') as fh:
+ fh.write(b'hello')
+ if cache_timeout:
+ safe_sleep(cache_timeout+1)
+ assert name in os.listdir(mnt_dir)
+ os.unlink(fullname)
+ with pytest.raises(OSError) as exc_info:
+ os.stat(fullname)
+ assert exc_info.value.errno == errno.ENOENT
+ assert name not in os.listdir(mnt_dir)
+ assert name not in os.listdir(src_dir)
+
+def tst_mkdir(mnt_dir):
+ dirname = name_generator()
+ fullname = mnt_dir + "/" + dirname
+ os.mkdir(fullname)
+ fstat = os.stat(fullname)
+ assert stat.S_ISDIR(fstat.st_mode)
+ assert os.listdir(fullname) == []
+ assert fstat.st_nlink in (1,2)
+ assert dirname in os.listdir(mnt_dir)
+
+def tst_rmdir(src_dir, mnt_dir, cache_timeout):
+ name = name_generator()
+ fullname = mnt_dir + "/" + name
+ os.mkdir(pjoin(src_dir, name))
+ if cache_timeout:
+ safe_sleep(cache_timeout+1)
+ assert name in os.listdir(mnt_dir)
+ os.rmdir(fullname)
+ with pytest.raises(OSError) as exc_info:
+ os.stat(fullname)
+ assert exc_info.value.errno == errno.ENOENT
+ assert name not in os.listdir(mnt_dir)
+ assert name not in os.listdir(src_dir)
+
+def tst_symlink(mnt_dir):
+ linkname = name_generator()
+ fullname = mnt_dir + "/" + linkname
+ os.symlink("/imaginary/dest", fullname)
+ fstat = os.lstat(fullname)
+ assert stat.S_ISLNK(fstat.st_mode)
+ assert os.readlink(fullname) == "/imaginary/dest"
+ assert fstat.st_nlink == 1
+ assert linkname in os.listdir(mnt_dir)
+
+def tst_create(mnt_dir):
+ name = name_generator()
+ fullname = pjoin(mnt_dir, name)
+ with pytest.raises(OSError) as exc_info:
+ os.stat(fullname)
+ assert exc_info.value.errno == errno.ENOENT
+ assert name not in os.listdir(mnt_dir)
+
+ fd = os.open(fullname, os.O_CREAT | os.O_RDWR)
+ os.close(fd)
+
+ assert name in os.listdir(mnt_dir)
+ fstat = os.lstat(fullname)
+ assert stat.S_ISREG(fstat.st_mode)
+ assert fstat.st_nlink == 1
+ assert fstat.st_size == 0
+
+def tst_chown(mnt_dir):
+ filename = pjoin(mnt_dir, name_generator())
+ os.mkdir(filename)
+ fstat = os.lstat(filename)
+ uid = fstat.st_uid
+ gid = fstat.st_gid
+
+ uid_new = uid + 1
+ os.chown(filename, uid_new, -1)
+ fstat = os.lstat(filename)
+ assert fstat.st_uid == uid_new
+ assert fstat.st_gid == gid
+
+ gid_new = gid + 1
+ os.chown(filename, -1, gid_new)
+ fstat = os.lstat(filename)
+ assert fstat.st_uid == uid_new
+ assert fstat.st_gid == gid_new
+
+def tst_open_read(src_dir, mnt_dir):
+ name = name_generator()
+ with open(pjoin(src_dir, name), 'wb') as fh_out, \
+ open(TEST_FILE, 'rb') as fh_in:
+ shutil.copyfileobj(fh_in, fh_out)
+
+ assert filecmp.cmp(pjoin(mnt_dir, name), TEST_FILE, False)
+
+def tst_open_write(src_dir, mnt_dir):
+ name = name_generator()
+ fd = os.open(pjoin(src_dir, name),
+ os.O_CREAT | os.O_RDWR)
+ os.close(fd)
+ fullname = pjoin(mnt_dir, name)
+ with open(fullname, 'wb') as fh_out, \
+ open(TEST_FILE, 'rb') as fh_in:
+ shutil.copyfileobj(fh_in, fh_out)
+
+ assert filecmp.cmp(fullname, TEST_FILE, False)
+
+def tst_open_unlink(mnt_dir):
+ name = pjoin(mnt_dir, name_generator())
+ data1 = b'foo'
+ data2 = b'bar'
+ fullname = pjoin(mnt_dir, name)
+ with open(fullname, 'wb+', buffering=0) as fh:
+ fh.write(data1)
+ os.unlink(fullname)
+ with pytest.raises(OSError) as exc_info:
+ os.stat(fullname)
+ assert exc_info.value.errno == errno.ENOENT
+ assert name not in os.listdir(mnt_dir)
+ fh.write(data2)
+ fh.seek(0)
+ assert fh.read() == data1+data2
+
+def tst_statvfs(mnt_dir):
+ os.statvfs(mnt_dir)
+
+def tst_link(mnt_dir):
+ name1 = pjoin(mnt_dir, name_generator())
+ name2 = pjoin(mnt_dir, name_generator())
+ shutil.copyfile(TEST_FILE, name1)
+ assert filecmp.cmp(name1, TEST_FILE, False)
+
+ fstat1 = os.lstat(name1)
+ assert fstat1.st_nlink == 1
+
+ os.link(name1, name2)
+
+ fstat1 = os.lstat(name1)
+ fstat2 = os.lstat(name2)
+ for attr in ('st_mode', 'st_dev', 'st_uid', 'st_gid',
+ 'st_size', 'st_atime', 'st_mtime', 'st_ctime'):
+ assert getattr(fstat1, attr) == getattr(fstat2, attr)
+ assert os.path.basename(name2) in os.listdir(mnt_dir)
+ assert filecmp.cmp(name1, name2, False)
+
+ os.unlink(name2)
+
+ assert os.path.basename(name2) not in os.listdir(mnt_dir)
+ with pytest.raises(FileNotFoundError):
+ os.lstat(name2)
+
+ os.unlink(name1)
+
+def tst_readdir(src_dir, mnt_dir):
+ newdir = name_generator()
+ src_newdir = pjoin(src_dir, newdir)
+ mnt_newdir = pjoin(mnt_dir, newdir)
+ file_ = src_newdir + "/" + name_generator()
+ subdir = src_newdir + "/" + name_generator()
+ subfile = subdir + "/" + name_generator()
+
+ os.mkdir(src_newdir)
+ shutil.copyfile(TEST_FILE, file_)
+ os.mkdir(subdir)
+ shutil.copyfile(TEST_FILE, subfile)
+
+ listdir_is = os.listdir(mnt_newdir)
+ listdir_is.sort()
+ listdir_should = [ os.path.basename(file_), os.path.basename(subdir) ]
+ listdir_should.sort()
+ assert listdir_is == listdir_should
+
+ os.unlink(file_)
+ os.unlink(subfile)
+ os.rmdir(subdir)
+ os.rmdir(src_newdir)
+
+def tst_truncate_path(mnt_dir):
+ assert len(TEST_DATA) > 1024
+
+ filename = pjoin(mnt_dir, name_generator())
+ with open(filename, 'wb') as fh:
+ fh.write(TEST_DATA)
+
+ fstat = os.stat(filename)
+ size = fstat.st_size
+ assert size == len(TEST_DATA)
+
+ # Add zeros at the end
+ os.truncate(filename, size + 1024)
+ assert os.stat(filename).st_size == size + 1024
+ with open(filename, 'rb') as fh:
+ assert fh.read(size) == TEST_DATA
+ assert fh.read(1025) == b'\0' * 1024
+
+ # Truncate data
+ os.truncate(filename, size - 1024)
+ assert os.stat(filename).st_size == size - 1024
+ with open(filename, 'rb') as fh:
+ assert fh.read(size) == TEST_DATA[:size-1024]
+
+ os.unlink(filename)
+
+def tst_truncate_fd(mnt_dir):
+ assert len(TEST_DATA) > 1024
+ with NamedTemporaryFile('w+b', 0, dir=mnt_dir) as fh:
+ fd = fh.fileno()
+ fh.write(TEST_DATA)
+ fstat = os.fstat(fd)
+ size = fstat.st_size
+ assert size == len(TEST_DATA)
+
+ # Add zeros at the end
+ os.ftruncate(fd, size + 1024)
+ assert os.fstat(fd).st_size == size + 1024
+ fh.seek(0)
+ assert fh.read(size) == TEST_DATA
+ assert fh.read(1025) == b'\0' * 1024
+
+ # Truncate data
+ os.ftruncate(fd, size - 1024)
+ assert os.fstat(fd).st_size == size - 1024
+ fh.seek(0)
+ assert fh.read(size) == TEST_DATA[:size-1024]
+
+def tst_utimens(mnt_dir, tol=0):
+ filename = pjoin(mnt_dir, name_generator())
+ os.mkdir(filename)
+ fstat = os.lstat(filename)
+
+ atime = fstat.st_atime + 42.28
+ mtime = fstat.st_mtime - 42.23
+ if sys.version_info < (3,3):
+ os.utime(filename, (atime, mtime))
+ else:
+ atime_ns = fstat.st_atime_ns + int(42.28*1e9)
+ mtime_ns = fstat.st_mtime_ns - int(42.23*1e9)
+ os.utime(filename, None, ns=(atime_ns, mtime_ns))
+
+ fstat = os.lstat(filename)
+
+ assert abs(fstat.st_atime - atime) < tol
+ assert abs(fstat.st_mtime - mtime) < tol
+ if sys.version_info >= (3,3):
+ assert abs(fstat.st_atime_ns - atime_ns) < tol*1e9
+ assert abs(fstat.st_mtime_ns - mtime_ns) < tol*1e9
+
+def tst_passthrough(src_dir, mnt_dir, cache_timeout):
+ name = name_generator()
+ src_name = pjoin(src_dir, name)
+ mnt_name = pjoin(src_dir, name)
+ assert name not in os.listdir(src_dir)
+ assert name not in os.listdir(mnt_dir)
+ with open(src_name, 'w') as fh:
+ fh.write('Hello, world')
+ assert name in os.listdir(src_dir)
+ if cache_timeout:
+ safe_sleep(cache_timeout+1)
+ assert name in os.listdir(mnt_dir)
+ assert os.stat(src_name) == os.stat(mnt_name)
+
+ name = name_generator()
+ src_name = pjoin(src_dir, name)
+ mnt_name = pjoin(src_dir, name)
+ assert name not in os.listdir(src_dir)
+ assert name not in os.listdir(mnt_dir)
+ with open(mnt_name, 'w') as fh:
+ fh.write('Hello, world')
+ assert name in os.listdir(src_dir)
+ if cache_timeout:
+ safe_sleep(cache_timeout+1)
+ assert name in os.listdir(mnt_dir)
+ assert os.stat(src_name) == os.stat(mnt_name)
diff --git a/test/travis-build.sh b/test/travis-build.sh
new file mode 100755
index 0000000..ba04295
--- /dev/null
+++ b/test/travis-build.sh
@@ -0,0 +1,48 @@
+#!/bin/bash
+
+set -e
+
+# Disable leak checking for now, there are some issues (or false positives)
+# that we still need to fix
+export ASAN_OPTIONS="detect_leaks=0"
+
+export LSAN_OPTIONS="suppressions=$(pwd)/test/lsan_suppress.txt"
+export CC
+
+TEST_CMD="python3 -m pytest --maxfail=99 test/"
+
+# Standard build with Valgrind
+for CC in gcc gcc-6 clang; do
+ mkdir build-${CC}; cd build-${CC}
+ if [ ${CC} == 'gcc-6' ]; then
+ build_opts='-D b_lundef=false'
+ else
+ build_opts=''
+ fi
+ meson -D werror=true ${build_opts} ../
+ ninja
+
+ TEST_WITH_VALGRIND=true ${TEST_CMD}
+ cd ..
+done
+(cd build-$CC; sudo ninja install)
+
+# Sanitized build
+CC=clang
+for san in undefined address; do
+ mkdir build-${san}; cd build-${san}
+ # b_lundef=false is required to work around clang
+ # bug, cf. https://groups.google.com/forum/#!topic/mesonbuild/tgEdAXIIdC4
+ meson -D b_sanitize=${san} -D b_lundef=false -D werror=true ..
+ ninja
+ ${TEST_CMD}
+ cd ..
+done
+
+# Autotools build
+CC=gcc
+autoreconf -i
+./configure
+make
+${TEST_CMD}
+sudo make install
diff --git a/test/travis-install.sh b/test/travis-install.sh
new file mode 100755
index 0000000..d7d1f05
--- /dev/null
+++ b/test/travis-install.sh
@@ -0,0 +1,21 @@
+#!/bin/sh
+
+set -e
+
+sudo ln -svf $(which python3) /usr/bin/python3
+sudo python3 -m pip install pytest meson
+wget https://github.com/ninja-build/ninja/releases/download/v1.7.2/ninja-linux.zip
+unzip ninja-linux.zip
+chmod 755 ninja
+sudo chown root:root ninja
+sudo mv -fv ninja /usr/local/bin
+valgrind --version
+ninja --version
+meson --version
+
+# Setup ssh
+ssh-keygen -b 768 -t rsa -f ~/.ssh/id_rsa -P ''
+cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
+chmod 600 ~/.ssh/authorized_keys
+ssh -o "StrictHostKeyChecking=no" localhost echo "SSH connection succeeded"
+
diff --git a/test/util.py b/test/util.py
new file mode 100644
index 0000000..f2a485c
--- /dev/null
+++ b/test/util.py
@@ -0,0 +1,100 @@
+#!/usr/bin/env python3
+import subprocess
+import pytest
+import os
+import stat
+import time
+from os.path import join as pjoin
+
+basename = pjoin(os.path.dirname(__file__), '..')
+
+def wait_for_mount(mount_process, mnt_dir,
+ test_fn=os.path.ismount):
+ elapsed = 0
+ while elapsed < 30:
+ if test_fn(mnt_dir):
+ return True
+ if mount_process.poll() is not None:
+ pytest.fail('file system process terminated prematurely')
+ time.sleep(0.1)
+ elapsed += 0.1
+ pytest.fail("mountpoint failed to come up")
+
+def cleanup(mnt_dir):
+ subprocess.call(['fusermount', '-z', '-u', mnt_dir],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.STDOUT)
+
+def umount(mount_process, mnt_dir):
+ subprocess.check_call(['fusermount', '-z', '-u', mnt_dir ])
+ assert not os.path.ismount(mnt_dir)
+
+ # Give mount process a little while to terminate. Popen.wait(timeout)
+ # was only added in 3.3...
+ elapsed = 0
+ while elapsed < 30:
+ code = mount_process.poll()
+ if code is not None:
+ if code == 0:
+ return
+ pytest.fail('file system process terminated with code %s' % (code,))
+ time.sleep(0.1)
+ elapsed += 0.1
+ pytest.fail('mount process did not terminate')
+
+def safe_sleep(secs):
+ '''Like time.sleep(), but sleep for at least *secs*
+
+ `time.sleep` may sleep less than the given period if a signal is
+ received. This function ensures that we sleep for at least the
+ desired time.
+ '''
+
+ now = time.time()
+ end = now + secs
+ while now < end:
+ time.sleep(end - now)
+ now = time.time()
+
+def fuse_test_marker():
+ '''Return a pytest.marker that indicates FUSE availability
+
+ If system/user/environment does not support FUSE, return
+ a `pytest.mark.skip` object with more details. If FUSE is
+ supported, return `pytest.mark.uses_fuse()`.
+ '''
+
+ skip = lambda x: pytest.mark.skip(reason=x)
+
+ with subprocess.Popen(['which', 'fusermount'], stdout=subprocess.PIPE,
+ universal_newlines=True) as which:
+ fusermount_path = which.communicate()[0].strip()
+
+ if not fusermount_path or which.returncode != 0:
+ return skip("Can't find fusermount executable")
+
+ if not os.path.exists('/dev/fuse'):
+ return skip("FUSE kernel module does not seem to be loaded")
+
+ if os.getuid() == 0:
+ return pytest.mark.uses_fuse()
+
+ mode = os.stat(fusermount_path).st_mode
+ if mode & stat.S_ISUID == 0:
+ return skip('fusermount executable not setuid, and we are not root.')
+
+ try:
+ fd = os.open('/dev/fuse', os.O_RDWR)
+ except OSError as exc:
+ return skip('Unable to open /dev/fuse: %s' % exc.strerror)
+ else:
+ os.close(fd)
+
+ return pytest.mark.uses_fuse()
+
+# Use valgrind if requested
+if os.environ.get('TEST_WITH_VALGRIND', 'no').lower().strip() \
+ not in ('no', 'false', '0'):
+ base_cmdline = [ 'valgrind', '-q', '--' ]
+else:
+ base_cmdline = []
diff --git a/test/wrong_command.c b/test/wrong_command.c
new file mode 100644
index 0000000..8366a98
--- /dev/null
+++ b/test/wrong_command.c
@@ -0,0 +1,9 @@
+#include <stdio.h>
+
+int main(void) {
+ fprintf(stderr, "\x1B[31m\e[1m"
+ "This is not the command you are looking for.\n"
+ "You probably want to run 'python3 -m pytest test/' instead"
+ "\e[0m\n");
+ return 1;
+}