Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions Lib/test/test_free_threading/test_io.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import codecs
import io
import os
import tempfile
import _pyio as pyio
import threading
from unittest import TestCase
Expand Down Expand Up @@ -170,3 +172,52 @@ def reset_worker():
decoder.reset()

run_concurrently([decode_worker] * 2 + [reset_worker] * 2)


class TestFileIO(TestCase):
NTHREADS = 4
ITERS = 200

def _run_io_vs_close(self, open_mode: str, io_func, data: bytes = b"") -> None:
"""Repeatedly race io_func against close on a fresh FileIO object."""
for _ in range(self.ITERS):
with tempfile.NamedTemporaryFile(delete=False) as tmp:
if data:
tmp.write(data)
name = tmp.name
try:
f = io.FileIO(name, open_mode)

def io_worker(f: io.FileIO = f) -> None:
for _ in range(10):
try:
io_func(f)
except (ValueError, OSError):
pass

def closer(f: io.FileIO = f) -> None:
try:
f.close()
except OSError:
pass

run_concurrently([io_worker] * self.NTHREADS + [closer])
finally:
os.unlink(name)

@threading_helper.requires_working_threading()
def test_concurrent_read_and_close(self):
self._run_io_vs_close("rb", lambda f: f.read(256), data=b"x" * 4096)

@threading_helper.requires_working_threading()
def test_concurrent_readinto_and_close(self):
buf = bytearray(256)
self._run_io_vs_close("rb", lambda f: f.readinto(buf), data=b"x" * 4096)

@threading_helper.requires_working_threading()
def test_concurrent_write_and_close(self):
self._run_io_vs_close("wb", lambda f: f.write(b"x" * 256))

@threading_helper.requires_working_threading()
def test_concurrent_seek_and_close(self):
self._run_io_vs_close("rb", lambda f: f.seek(0), data=b"x" * 256)
92 changes: 64 additions & 28 deletions Modules/_io/fileio.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,38 @@ fileio_dealloc_warn(PyObject *op, PyObject *source)
Py_RETURN_NONE;
}

/* Atomic accessors for self->fd.
* Relaxed ordering is sufficient: the OS syscall that follows provides the
* real barrier, and we only need to prevent data-race UB on the int-sized field itself. */
static inline int
fileio_get_fd(fileio *self)
{
return _Py_atomic_load_int_relaxed(&self->fd);
}

static inline void
fileio_set_fd(fileio *self, int fd)
{
_Py_atomic_store_int_relaxed(&self->fd, fd);
}

/* Atomically replace self->fd with -1 and return the old value.
* Used by internal_close so that two concurrent calls to close cannot
* both observe fd >= 0 and both attempt to close the same descriptor. */
static inline int
fileio_exchange_fd(fileio *self)
{
return _Py_atomic_exchange_int(&self->fd, -1);
}

/* Returns 0 on success, -1 with exception set on failure. */
static int
internal_close(fileio *self)
{
int err = 0;
int save_errno = 0;
if (self->fd >= 0) {
int fd = self->fd;
self->fd = -1;
int fd = fileio_exchange_fd(self);
if (fd >= 0) {
/* fd is accessible and someone else may have closed it */
Py_BEGIN_ALLOW_THREADS
_Py_BEGIN_SUPPRESS_IPH
Expand Down Expand Up @@ -167,7 +190,7 @@ _io_FileIO_close_impl(fileio *self, PyTypeObject *cls)
res = PyObject_CallMethodOneArg((PyObject*)state->PyRawIOBase_Type,
&_Py_ID(close), (PyObject *)self);
if (!self->closefd) {
self->fd = -1;
fileio_set_fd(self, -1);
return res;
}

Expand Down Expand Up @@ -609,9 +632,10 @@ static PyObject *
_io_FileIO_fileno_impl(fileio *self)
/*[clinic end generated code: output=a9626ce5398ece90 input=0b9b2de67335ada3]*/
{
if (self->fd < 0)
int fd = fileio_get_fd(self);
if (fd < 0)
return err_closed();
return PyLong_FromLong((long) self->fd);
return PyLong_FromLong((long) fd);
}

/*[clinic input]
Expand All @@ -624,7 +648,7 @@ static PyObject *
_io_FileIO_readable_impl(fileio *self)
/*[clinic end generated code: output=640744a6150fe9ba input=a3fdfed6eea721c5]*/
{
if (self->fd < 0)
if (fileio_get_fd(self) < 0)
return err_closed();
return PyBool_FromLong((long) self->readable);
}
Expand All @@ -639,7 +663,7 @@ static PyObject *
_io_FileIO_writable_impl(fileio *self)
/*[clinic end generated code: output=96cefc5446e89977 input=c204a808ca2e1748]*/
{
if (self->fd < 0)
if (fileio_get_fd(self) < 0)
return err_closed();
return PyBool_FromLong((long) self->writable);
}
Expand All @@ -654,7 +678,7 @@ static PyObject *
_io_FileIO_seekable_impl(fileio *self)
/*[clinic end generated code: output=47909ca0a42e9287 input=c8e5554d2fd63c7f]*/
{
if (self->fd < 0)
if (fileio_get_fd(self) < 0)
return err_closed();
if (self->seekable < 0) {
/* portable_lseek() sets the seekable attribute */
Expand Down Expand Up @@ -686,14 +710,16 @@ _io_FileIO_readinto_impl(fileio *self, PyTypeObject *cls, Py_buffer *buffer)
Py_ssize_t n;
int err;

if (self->fd < 0)
int fd;
fd = fileio_get_fd(self);
if (fd < 0)
return err_closed();
if (!self->readable) {
_PyIO_State *state = get_io_state_by_cls(cls);
return err_mode(state, "reading");
}

n = _Py_read(self->fd, buffer->buf, buffer->len);
n = _Py_read(fd, buffer->buf, buffer->len);
/* copy errno because PyBuffer_Release() can indirectly modify it */
err = errno;

Expand Down Expand Up @@ -754,7 +780,9 @@ _io_FileIO_readall_impl(fileio *self, PyTypeObject *cls)
Py_ssize_t n;
size_t bufsize;

if (self->fd < 0) {
int fd;
fd = fileio_get_fd(self);
if (fd < 0) {
return err_closed();
}
if (!self->readable) {
Expand Down Expand Up @@ -795,9 +823,9 @@ _io_FileIO_readall_impl(fileio *self, PyTypeObject *cls)
Py_BEGIN_ALLOW_THREADS
_Py_BEGIN_SUPPRESS_IPH
#ifdef MS_WINDOWS
pos = _lseeki64(self->fd, 0L, SEEK_CUR);
pos = _lseeki64(fd, 0L, SEEK_CUR);
#else
pos = lseek(self->fd, 0L, SEEK_CUR);
pos = lseek(fd, 0L, SEEK_CUR);
#endif
_Py_END_SUPPRESS_IPH
Py_END_ALLOW_THREADS
Expand Down Expand Up @@ -830,7 +858,7 @@ _io_FileIO_readall_impl(fileio *self, PyTypeObject *cls)
}
}

n = _Py_read(self->fd,
n = _Py_read(fd,
(char*)PyBytesWriter_GetData(writer) + bytes_read,
bufsize - bytes_read);

Expand Down Expand Up @@ -875,7 +903,9 @@ static PyObject *
_io_FileIO_read_impl(fileio *self, PyTypeObject *cls, Py_ssize_t size)
/*[clinic end generated code: output=bbd749c7c224143e input=c7baa3b440af9337]*/
{
if (self->fd < 0)
int fd;
fd = fileio_get_fd(self);
if (fd < 0)
return err_closed();
if (!self->readable) {
_PyIO_State *state = get_io_state_by_cls(cls);
Expand All @@ -895,7 +925,7 @@ _io_FileIO_read_impl(fileio *self, PyTypeObject *cls, Py_ssize_t size)
}
char *ptr = PyBytesWriter_GetData(writer);

Py_ssize_t n = _Py_read(self->fd, ptr, size);
Py_ssize_t n = _Py_read(fd, ptr, size);
if (n == -1) {
// copy errno because PyBytesWriter_Discard() can indirectly modify it
int err = errno;
Expand Down Expand Up @@ -930,14 +960,16 @@ _io_FileIO_write_impl(fileio *self, PyTypeObject *cls, Py_buffer *b)
Py_ssize_t n;
int err;

if (self->fd < 0)
int fd;
fd = fileio_get_fd(self);
if (fd < 0)
return err_closed();
if (!self->writable) {
_PyIO_State *state = get_io_state_by_cls(cls);
return err_mode(state, "writing");
}

n = _Py_write(self->fd, b->buf, b->len);
n = _Py_write(fd, b->buf, b->len);
/* copy errno because PyBuffer_Release() can indirectly modify it */
err = errno;

Expand All @@ -959,7 +991,8 @@ static PyObject *
portable_lseek(fileio *self, PyObject *posobj, int whence, bool suppress_pipe_error)
{
Py_off_t pos, res;
int fd = self->fd;
int fd;
fd = fileio_get_fd(self);

#ifdef SEEK_SET
/* Turn 0, 1, 2 into SEEK_{SET,CUR,END} */
Expand Down Expand Up @@ -1040,7 +1073,7 @@ static PyObject *
_io_FileIO_seek_impl(fileio *self, PyObject *pos, int whence)
/*[clinic end generated code: output=c976acdf054e6655 input=f165a1b4f5d494ad]*/
{
if (self->fd < 0)
if (fileio_get_fd(self) < 0)
return err_closed();

return portable_lseek(self, pos, whence, false);
Expand All @@ -1058,7 +1091,7 @@ static PyObject *
_io_FileIO_tell_impl(fileio *self)
/*[clinic end generated code: output=ffe2147058809d0b input=807e24ead4cec2f9]*/
{
if (self->fd < 0)
if (fileio_get_fd(self) < 0)
return err_closed();

return portable_lseek(self, NULL, 1, false);
Expand Down Expand Up @@ -1086,7 +1119,7 @@ _io_FileIO_truncate_impl(fileio *self, PyTypeObject *cls, PyObject *posobj)
int ret;
int fd;

fd = self->fd;
fd = fileio_get_fd(self);
if (fd < 0)
return err_closed();
if (!self->writable) {
Expand Down Expand Up @@ -1181,7 +1214,8 @@ fileio_repr(PyObject *op)
fileio *self = PyFileIO_CAST(op);
const char *type_name = Py_TYPE(self)->tp_name;

if (self->fd < 0) {
int fd = fileio_get_fd(self);
if (fd < 0) {
return PyUnicode_FromFormat("<%.100s [closed]>", type_name);
}

Expand All @@ -1193,7 +1227,7 @@ fileio_repr(PyObject *op)
if (nameobj == NULL) {
res = PyUnicode_FromFormat(
"<%.100s fd=%d mode='%s' closefd=%s>",
type_name, self->fd, mode_string(self), self->closefd ? "True" : "False");
type_name, fd, mode_string(self), self->closefd ? "True" : "False");
}
else {
int status = Py_ReprEnter((PyObject *)self);
Expand Down Expand Up @@ -1225,11 +1259,13 @@ _io_FileIO_isatty_impl(fileio *self)
{
long res;

if (self->fd < 0)
int fd;
fd = fileio_get_fd(self);
if (fd < 0)
return err_closed();
Py_BEGIN_ALLOW_THREADS
_Py_BEGIN_SUPPRESS_IPH
res = isatty(self->fd);
res = isatty(fd);
_Py_END_SUPPRESS_IPH
Py_END_ALLOW_THREADS
return PyBool_FromLong(res);
Expand Down Expand Up @@ -1281,7 +1317,7 @@ static PyObject *
fileio_get_closed(PyObject *op, void *closure)
{
fileio *self = PyFileIO_CAST(op);
return PyBool_FromLong((long)(self->fd < 0));
return PyBool_FromLong((long)(fileio_get_fd(self) < 0));
}

static PyObject *
Expand Down
Loading