import logging
import multiprocessing
import os
import re
from dataclasses import dataclass
from io import BytesIO
from itertools import repeat, takewhile
from pathlib import Path
from typing import TYPE_CHECKING, Any, List, Mapping, Optional, Union
from urllib.parse import ParseResult as UrlParseResult
from urllib.parse import urlparse
import numpy as np
if TYPE_CHECKING:
from scipy import sparse
else:
try:
from scipy import sparse
except ImportError:
sparse = None
from .bed_reader import (
check_file_cloud,
read_cloud_f32,
read_cloud_f64,
read_cloud_i8,
read_f32,
read_f64,
read_i8,
url_to_bytes,
)
# https://stackoverflow.com/questions/845058/how-to-get-line-count-of-a-large-file-cheaply-in-python
def _rawincount(f):
f.seek(0)
bufgen = takewhile(lambda x: x, (f.read(1024 * 1024) for _ in repeat(None)))
return sum(buf.count(b"\n") for buf in bufgen)
@dataclass
class _MetaMeta:
suffix: str
column: int
dtype: type
missing_value: object
fill_sequence: object
def _all_same(key, length, missing, dtype):
if np.issubdtype(dtype, np.str_):
dtype = f"<U{len(missing)}"
return np.full(length, missing, dtype=dtype)
def _sequence(key, length, missing, dtype):
if np.issubdtype(dtype, np.str_):
longest = len(f"{key}{length}")
dtype = f"<U{longest}"
return np.fromiter(
(f"{key}{i + 1}" for i in range(length)), dtype=dtype, count=length,
)
_delimiters = {"fam": r"\s+", "bim": "\t"}
_count_name = {"fam": "iid_count", "bim": "sid_count"}
_meta_meta = {
# https://stackoverflow.com/questions/41921255/staticmethod-object-is-not-callable
"fid": _MetaMeta("fam", 0, np.str_, "0", _all_same),
"iid": _MetaMeta("fam", 1, np.str_, None, _sequence),
"father": _MetaMeta("fam", 2, np.str_, "0", _all_same),
"mother": _MetaMeta("fam", 3, np.str_, "0", _all_same),
"sex": _MetaMeta("fam", 4, np.int32, 0, _all_same),
"pheno": _MetaMeta("fam", 5, np.str_, "0", _all_same),
"chromosome": _MetaMeta("bim", 0, np.str_, "0", _all_same),
"sid": _MetaMeta("bim", 1, np.str_, None, _sequence),
"cm_position": _MetaMeta("bim", 2, np.float32, 0, _all_same),
"bp_position": _MetaMeta("bim", 3, np.int32, 0, _all_same),
"allele_1": _MetaMeta("bim", 4, np.str_, "A1", _all_same),
"allele_2": _MetaMeta("bim", 5, np.str_, "A2", _all_same),
}
def get_num_threads(num_threads=None):
if num_threads is not None:
return num_threads
if "PST_NUM_THREADS" in os.environ:
return int(os.environ["PST_NUM_THREADS"])
if "NUM_THREADS" in os.environ:
return int(os.environ["NUM_THREADS"])
if "MKL_NUM_THREADS" in os.environ:
return int(os.environ["MKL_NUM_THREADS"])
return multiprocessing.cpu_count()
def get_max_concurrent_requests(max_concurrent_requests=None):
if max_concurrent_requests is not None:
return max_concurrent_requests
return 10
def get_max_chunk_bytes(max_chunk_bytes=None):
if max_chunk_bytes is not None:
return max_chunk_bytes
return 8_000_000
[docs]
class open_bed:
"""Open a PLINK .bed file, local or cloud, for reading.
Parameters
----------
location: pathlib.Path or str
File path or URL to the .bed file. See :doc:`cloud_urls` for details on cloud URLs.
iid_count: None or int, optional
Number of individuals (samples) in the .bed file.
The default (``iid_count=None``) finds the number
automatically by quickly scanning the .fam file.
sid_count: None or int, optional
Number of SNPs (variants) in the .bed file.
The default (``sid_count=None``) finds the number
automatically by quickly scanning the .bim file.
properties: dict, optional
A dictionary of any replacement properties. The default is an empty dictionary.
The keys of the dictionary are the names of the properties to replace.
The possible keys are:
"fid" (family id), "iid" (individual or sample id), "father" (father id),
"mother" (mother id), "sex", "pheno" (phenotype), "chromosome", "sid"
(SNP or variant id), "cm_position" (centimorgan position), "bp_position"
(base-pair position), "allele_1", "allele_2".
The values are replacement lists or arrays. A value can also be `None`,
meaning do not read or offer this property. See examples, below.
The list or array will be converted to a :class:`numpy.ndarray`
of the appropriate dtype, if necessary. Any :data:`numpy.nan` values
will converted to the appropriate missing value. The PLINK `.fam specification
<https://www.cog-genomics.org/plink2/formats#fam>`_
and `.bim specification <https://www.cog-genomics.org/plink2/formats#bim>`_
lists the dtypes and missing values for each property.
count_A1: bool, optional
True (default) to count the number of A1 alleles (the PLINK standard).
False to count the number of A2 alleles.
num_threads: None or int, optional
The number of threads with which to read data. Defaults to all available
processors.
Can also be set with these environment variables (listed in priority order):
'PST_NUM_THREADS', 'NUM_THREADS', 'MKL_NUM_THREADS'.
skip_format_check: bool, optional
False (default) to immediately check for expected starting bytes in
the .bed file. True to delay the check until (and if) data is read.
fam_location: pathlib.Path or str or URL, optional
Path to the file containing information about each individual (sample).
Defaults to replacing the .bed file’s suffix with .fam.
bim_location: pathlib.Path or str URL, optional
Path to the file containing information about each SNP (variant).
Defaults to replacing the .bed file’s suffix with .bim.
cloud_options: dict, optional
A dictionary of options for reading from cloud storage. The default is an empty.
max_concurrent_requests: None or int, optional
The maximum number of concurrent requests to make to the cloud storage service.
Defaults to 10.
max_chunk_bytes: None or int, optional
The maximum number of bytes to read in a single request to the cloud storage
service. Defaults to 8MB.
filepath: same as location
Deprecated. Use location instead.
fam_filepath: same as fam_location
Deprecated. Use fam_location instead.
bim_filepath: same as bim_location
Deprecated. Use bim_location instead.
Returns
-------
open_bed
an open_bed object
Examples
--------
Open a local file and list individual (sample) :attr:`iid` and SNP (variant) :attr:`sid`.
Then, :meth:`read` the whole file.
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> bed = open_bed(file_name)
>>> print(bed.iid)
['iid1' 'iid2' 'iid3']
>>> print(bed.sid)
['sid1' 'sid2' 'sid3' 'sid4']
>>> print(bed.read())
[[ 1. 0. nan 0.]
[ 2. 0. nan 2.]
[ 0. 1. 2. 0.]]
>>> del bed # optional: delete bed object
Open a cloud file with a non-default timeout.
Then, read the data for one SNP (variant)
at index position 2.
See :doc:`cloud_urls` for details on reading files from cloud storage.
.. doctest::
>>> import numpy as np
>>> with open_bed("https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/small.bed",
... cloud_options={"timeout": "10s"}) as bed:
... print(bed.read(np.s_[:,2]))
[[nan]
[nan]
[ 2.]]
With the local file, replace :attr:`iid`.
>>> bed = open_bed(file_name, properties={"iid":["sample1","sample2","sample3"]})
>>> print(bed.iid) # replaced
['sample1' 'sample2' 'sample3']
>>> print(bed.sid) # same as before
['sid1' 'sid2' 'sid3' 'sid4']
Give the number of individuals (samples) and SNPs (variants) so that the .fam and
.bim files need never be opened.
>>> with open_bed(file_name, iid_count=3, sid_count=4) as bed:
... print(bed.read())
[[ 1. 0. nan 0.]
[ 2. 0. nan 2.]
[ 0. 1. 2. 0.]]
Mark some properties as "don’t read or offer".
>>> bed = open_bed(file_name, properties={
... "father" : None, "mother" : None, "sex" : None, "pheno" : None,
... "allele_1" : None, "allele_2":None })
>>> print(bed.iid) # read from file
['iid1' 'iid2' 'iid3']
>>> print(bed.allele_2) # not read and not offered
None
See the :meth:`read` for details of reading batches via slicing and fancy indexing.
"""
def __init__(
self,
location: Union[str, Path, UrlParseResult],
iid_count: Optional[int] = None,
sid_count: Optional[int] = None,
properties: Mapping[str, List[Any]] = {},
count_A1: bool = True,
num_threads: Optional[int] = None,
skip_format_check: bool = False,
fam_location: Optional[Union[str, Path, UrlParseResult]] = None,
bim_location: Optional[Union[str, Path, UrlParseResult]] = None,
cloud_options: Mapping[str, str] = {},
max_concurrent_requests: Optional[int] = None,
max_chunk_bytes: Optional[int] = None,
# accept old keywords
filepath: Optional[Union[str, Path]] = None,
fam_filepath: Optional[Union[str, Path]] = None,
bim_filepath: Optional[Union[str, Path]] = None,
) -> None:
location = self._combined(location, filepath, "location", "filepath")
fam_location = self._combined(
fam_location, fam_filepath, "fam_location", "fam_filepath",
)
bim_location = self._combined(
bim_location, bim_filepath, "bim_location", "bim_filepath",
)
self.location = self._path_or_url(location)
self.cloud_options = cloud_options
self.count_A1 = count_A1
self._num_threads = num_threads
self._max_concurrent_requests = max_concurrent_requests
self._max_chunk_bytes = max_chunk_bytes
self.skip_format_check = skip_format_check
self._fam_location = (
self._path_or_url(fam_location)
if fam_location is not None
else self._replace_extension(self.location, "fam")
)
self._bim_location = (
self._path_or_url(bim_location)
if bim_location is not None
else self._replace_extension(self.location, "bim")
)
self.properties_dict, self._counts = open_bed._fix_up_properties(
properties, iid_count, sid_count, use_fill_sequence=False,
)
self._iid_range = None
self._sid_range = None
if not self.skip_format_check:
if self._is_url(self.location):
check_file_cloud(self.location.geturl(), self.cloud_options)
else:
with open(self.location, "rb") as filepointer:
self._mode = self._check_file(filepointer)
# # its an error to set both location and filepath
# location = self._combined(location, filepath, "location", "filepath")
# fam_location = self._combined(fam_location, fam_filepath, "fam_location", "fam_filepath")
# bim_location = self._combined(bim_location, bim_filepath, "bim_location", "bim_filepath")
@staticmethod
def _combined(location, filepath, location_name, filepath_name):
if location is not None and filepath is not None:
msg = f"Cannot set both {location_name} and {filepath_name}"
raise ValueError(msg)
# None, None is ok for now
return location if location is not None else filepath
@staticmethod
def _replace_extension(location, extension):
if open_bed._is_url(location):
# Split the path and change the extension
path, _ = os.path.splitext(location.path)
new_path = f"{path}.{extension}"
# Create a new ParseResult with the updated path
return UrlParseResult(
scheme=location.scheme,
netloc=location.netloc,
path=new_path,
params=location.params,
query=location.query,
fragment=location.fragment,
)
assert isinstance(location, Path) # real assert
return location.parent / (location.stem + "." + extension)
@staticmethod
def _is_url(location):
return isinstance(location, UrlParseResult)
@staticmethod
def _path_or_url(input):
if isinstance(input, Path):
return input
if isinstance(input, UrlParseResult):
return input
assert isinstance(
input, str,
), "Expected a string or Path object or UrlParseResult"
parsed = urlparse(input)
if parsed.scheme and "://" in input:
return parsed
return Path(input)
[docs]
def read(
self,
index: Optional[Any] = None,
dtype: Optional[Union[type, str]] = "float32",
order: Optional[str] = "F",
force_python_only: Optional[bool] = False,
num_threads=None,
max_concurrent_requests=None,
max_chunk_bytes=None,
) -> np.ndarray:
"""Read genotype information.
Parameters
----------
index:
An optional expression specifying the individuals (samples) and SNPs
(variants) to read. (See examples, below).
Defaults to ``None``, meaning read all.
(If index is a tuple, the first component indexes the individuals and the
second indexes
the SNPs. If it is not a tuple and not None, it indexes SNPs.)
dtype: {'float32' (default), 'float64', 'int8'}, optional
The desired data-type for the returned array.
order : {'F','C'}, optional
The desired memory layout for the returned array.
Defaults to ``F`` (Fortran order, which is SNP-major).
force_python_only: bool, optional
If False (default), uses the faster Rust code; otherwise it uses the slower
pure Python code.
num_threads: None or int, optional
The number of threads with which to read data. Defaults to all available
processors.
Can also be set with :class:`open_bed` or these
environment variables (listed in priority order):
'PST_NUM_THREADS', 'NUM_THREADS', 'MKL_NUM_THREADS'.
max_concurrent_requests: None or int, optional
The maximum number of concurrent requests to make to the cloud storage
service. Defaults to 10.
max_chunk_bytes: None or int, optional
The maximum number of bytes to read in a single request to the cloud
storage service. Defaults to 8MB.
Returns
-------
numpy.ndarray
2-D array containing values of 0, 1, 2, or missing
Rows represent individuals (samples). Columns represent SNPs (variants).
For ``dtype`` 'float32' and 'float64', NaN indicates missing values.
For 'int8', -127 indicates missing values.
Examples
--------
To read all data in a .bed file, set ``index`` to ``None``. This is the default.
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.read())
[[ 1. 0. nan 0.]
[ 2. 0. nan 2.]
[ 0. 1. 2. 0.]]
To read selected individuals (samples) and/or SNPs (variants), set each part of
a :data:`numpy.s_` to an `int`, a list of `int`, a slice expression, or
a list of `bool`.
Negative integers count from the end of the list.
.. doctest::
>>> import numpy as np
>>> bed = open_bed(file_name)
>>> print(bed.read(np.s_[:,2])) # read the SNPs indexed by 2.
[[nan]
[nan]
[ 2.]]
>>> print(bed.read(np.s_[:,[2,3,0]])) # read the SNPs indexed by 2, 3, and 0
[[nan 0. 1.]
[nan 2. 2.]
[ 2. 0. 0.]]
>>> # read SNPs from 1 (inclusive) to 4 (exclusive)
>>> print(bed.read(np.s_[:,1:4]))
[[ 0. nan 0.]
[ 0. nan 2.]
[ 1. 2. 0.]]
>>> print(np.unique(bed.chromosome)) # print unique chrom values
['1' '5' 'Y']
>>> print(bed.read(np.s_[:,bed.chromosome=='5'])) # read all SNPs in chrom 5
[[nan]
[nan]
[ 2.]]
>>> print(bed.read(np.s_[0,:])) # Read 1st individual (across all SNPs)
[[ 1. 0. nan 0.]]
>>> print(bed.read(np.s_[::2,:])) # Read every 2nd individual
[[ 1. 0. nan 0.]
[ 0. 1. 2. 0.]]
>>> #read last and 2nd-to-last individuals and the last SNPs
>>> print(bed.read(np.s_[[-1,-2],-1]))
[[0.]
[2.]]
You can give a dtype for the output.
.. doctest::
>>> print(bed.read(dtype='int8'))
[[ 1 0 -127 0]
[ 2 0 -127 2]
[ 0 1 2 0]]
>>> del bed # optional: delete bed object
"""
iid_index_or_slice_etc, sid_index_or_slice_etc = self._split_index(index)
dtype = np.dtype(dtype)
if order not in {"F", "C"}:
msg = f"order '{order}' not known, only 'F', 'C'"
raise ValueError(msg)
# Later happy with _iid_range and _sid_range or could it be done with
# allocation them?
if self._iid_range is None:
self._iid_range = np.arange(self.iid_count, dtype="intp")
if self._sid_range is None:
self._sid_range = np.arange(self.sid_count, dtype="intp")
iid_index = np.ascontiguousarray(
self._iid_range[iid_index_or_slice_etc],
dtype="intp",
)
sid_index = np.ascontiguousarray(
self._sid_range[sid_index_or_slice_etc], dtype="intp",
)
if not force_python_only or open_bed._is_url(self.location):
num_threads = get_num_threads(
self._num_threads if num_threads is None else num_threads,
)
max_concurrent_requests = get_max_concurrent_requests(
self._max_concurrent_requests
if max_concurrent_requests is None
else max_concurrent_requests,
)
max_chunk_bytes = get_max_chunk_bytes(
self._max_chunk_bytes if max_chunk_bytes is None else max_chunk_bytes,
)
val = np.zeros((len(iid_index), len(sid_index)), order=order, dtype=dtype)
if self.iid_count > 0 and self.sid_count > 0:
reader, location_str, is_cloud = self._pick_reader(dtype)
if not is_cloud:
reader(
location_str,
self.cloud_options,
iid_count=self.iid_count,
sid_count=self.sid_count,
is_a1_counted=self.count_A1,
iid_index=iid_index,
sid_index=sid_index,
val=val,
num_threads=num_threads,
)
else:
reader(
location_str,
self.cloud_options,
iid_count=self.iid_count,
sid_count=self.sid_count,
is_a1_counted=self.count_A1,
iid_index=iid_index,
sid_index=sid_index,
val=val,
num_threads=num_threads,
max_concurrent_requests=max_concurrent_requests,
max_chunk_bytes=max_chunk_bytes,
)
else:
if not self.count_A1:
byteZero = 0
byteThree = 2
else:
byteZero = 2
byteThree = 0
missing = -127 if dtype == np.int8 else np.nan
# An earlier version of this code had a way to read consecutive SNPs of code
# in one read. May want
# to add that ability back to the code.
# Also, note that reading with python will often result in
# non-contiguous memory
# logging.warn("using pure python plink parser (might be much slower!!)")
if self.major == "SNP":
minor_count = self.iid_count
minor_index = iid_index
major_index = sid_index
else:
minor_count = self.sid_count
minor_index = sid_index
major_index = iid_index
val = np.zeros(
((int(np.ceil(0.25 * minor_count)) * 4), len(major_index)),
order=order,
dtype=dtype,
) # allocate it a little big
nbyte = int(np.ceil(0.25 * minor_count))
with open(self.location, "rb") as filepointer:
for major_index_value, major_index_index in enumerate(major_index):
startbit = int(np.ceil(0.25 * minor_count) * major_index_index + 3)
filepointer.seek(startbit)
bytes = np.array(bytearray(filepointer.read(nbyte))).reshape(
(int(np.ceil(0.25 * minor_count)), 1), order="F",
)
val[3::4, major_index_value : major_index_value + 1] = byteZero
val[3::4, major_index_value : major_index_value + 1][
bytes >= 64
] = missing
val[3::4, major_index_value : major_index_value + 1][
bytes >= 128
] = 1
val[3::4, major_index_value : major_index_value + 1][
bytes >= 192
] = byteThree
bytes = np.mod(bytes, 64)
val[2::4, major_index_value : major_index_value + 1] = byteZero
val[2::4, major_index_value : major_index_value + 1][
bytes >= 16
] = missing
val[2::4, major_index_value : major_index_value + 1][
bytes >= 32
] = 1
val[2::4, major_index_value : major_index_value + 1][
bytes >= 48
] = byteThree
bytes = np.mod(bytes, 16)
val[1::4, major_index_value : major_index_value + 1] = byteZero
val[1::4, major_index_value : major_index_value + 1][
bytes >= 4
] = missing
val[1::4, major_index_value : major_index_value + 1][bytes >= 8] = 1
val[1::4, major_index_value : major_index_value + 1][
bytes >= 12
] = byteThree
bytes = np.mod(bytes, 4)
val[0::4, major_index_value : major_index_value + 1] = byteZero
val[0::4, major_index_value : major_index_value + 1][
bytes >= 1
] = missing
val[0::4, major_index_value : major_index_value + 1][bytes >= 2] = 1
val[0::4, major_index_value : major_index_value + 1][
bytes >= 3
] = byteThree
val = val[minor_index, :] # reorder or trim any extra allocation
assert val.dtype == np.dtype(dtype) # real assert
if not open_bed._array_properties_are_ok(val, order):
val = val.copy(order=order)
# if in force python mode, and individual-major mode, then we need to transpose
if self.major == "individual":
val = val.T
return val
def _pick_reader(self, dtype):
if dtype == np.int8:
file_reader = read_i8
cloud_reader = read_cloud_i8
elif dtype == np.float64:
file_reader = read_f64
cloud_reader = read_cloud_f64
elif dtype == np.float32:
file_reader = read_f32
cloud_reader = read_cloud_f32
else:
raise ValueError(
f"dtype '{dtype}' not known, only "
+ "'int8', 'float32', and 'float64' are allowed.",
)
if open_bed._is_url(self.location):
reader = cloud_reader
location_str = self.location.geturl()
is_cloud = True
else:
reader = file_reader
location_str = str(self.location.as_posix())
is_cloud = False
return reader, location_str, is_cloud
def __str__(self) -> str:
return f"{self.__class__.__name__}('{self.location}',...)"
@property
def major(self) -> str:
"""Major mode of a local .bed file.
Returns:
-------
str
'SNP' or 'individual'
Almost all PLINK 1.9 .bed files are 'SNP' major. This makes
reading the data by SNP(s) fast.
Errors
------
ValueError
If the file is a cloud file.
Example:
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.major)
SNP
"""
if self._is_url(self.location):
msg = "Cannot determine major mode for cloud files"
raise ValueError(msg)
# if self._mode is not set, set it
if not hasattr(self, "mode"):
with open(self.location, "rb") as filepointer:
self._mode = self._check_file(filepointer)
return "individual" if self._mode == b"\x00" else "SNP"
@property
def fid(self) -> np.ndarray:
"""Family id of each individual (sample).
Returns:
-------
numpy.ndarray
array of str
'0' represents a missing value.
If needed, will cause a one-time read of the .fam file.
Example:
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.fid)
['fid1' 'fid1' 'fid2']
"""
return self.property_item("fid")
@property
def iid(self) -> np.ndarray:
"""Individual id of each individual (sample).
Returns:
-------
numpy.ndarray
array of str
If needed, will cause a one-time read of the .fam file.
Example:
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.iid)
['iid1' 'iid2' 'iid3']
"""
return self.property_item("iid")
@property
def father(self) -> np.ndarray:
"""Father id of each individual (sample).
Returns:
-------
numpy.ndarray
array of str
'0' represents a missing value.
If needed, will cause a one-time read of the .fam file.
Example:
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.father)
['iid23' 'iid23' 'iid22']
"""
return self.property_item("father")
@property
def mother(self) -> np.ndarray:
"""Mother id of each individual (sample).
Returns:
-------
numpy.ndarray
array of str
'0' represents a missing value.
If needed, will cause a one-time read of the .fam file.
Example:
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.mother)
['iid34' 'iid34' 'iid33']
"""
return self.property_item("mother")
@property
def sex(self) -> np.ndarray:
"""Sex of each individual (sample).
Returns:
-------
numpy.ndarray
array of 0, 1, or 2
0 is unknown, 1 is male, 2 is female
If needed, will cause a one-time read of the .fam file.
Example:
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.sex)
[1 2 0]
"""
return self.property_item("sex")
@property
def pheno(self) -> np.ndarray:
"""A phenotype for each individual (sample)
(seldom used).
Returns:
-------
numpy.ndarray
array of str
'0' may represent a missing value.
If needed, will cause a one-time read of the .fam file.
Example:
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.pheno)
['red' 'red' 'blue']
"""
return self.property_item("pheno")
@property
def properties(self) -> Mapping[str, np.array]:
"""All the properties returned as a dictionary.
Returns:
-------
dict
all the properties
The keys of the dictionary are the names of the properties, namely:
"fid" (family id), "iid" (individual or sample id), "father" (father id),
"mother" (mother id), "sex", "pheno" (phenotype), "chromosome", "sid"
(SNP or variant id), "cm_position" (centimorgan position), "bp_position"
(base-pair position), "allele_1", "allele_2".
The values are :class:`numpy.ndarray`.
If needed, will cause a one-time read of the .fam and .bim file.
Example:
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(len(bed.properties)) #length of dict
12
"""
for key in _meta_meta:
self.property_item(key)
return self.properties_dict
[docs]
def property_item(self, name: str) -> np.ndarray:
"""Retrieve one property by name.
Returns:
-------
numpy.ndarray
a property value
The name is one of these:
"fid" (family id), "iid" (individual or sample id), "father" (father id),
"mother" (mother id), "sex", "pheno" (phenotype), "chromosome", "sid"
(SNP or variant id), "cm_position" (centimorgan position), "bp_position"
(base-pair position), "allele_1", "allele_2".
If needed, will cause a one-time read of the .fam or .bim file.
Example:
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.property_item('chromosome'))
['1' '1' '5' 'Y']
"""
if name not in self.properties_dict:
mm = _meta_meta[name]
self._read_fam_or_bim(suffix=mm.suffix)
return self.properties_dict[name]
@property
def chromosome(self) -> np.ndarray:
"""Chromosome of each SNP (variant).
Returns:
-------
numpy.ndarray
array of str
'0' represents a missing value.
If needed, will cause a one-time read of the .bim file.
Example:
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.chromosome)
['1' '1' '5' 'Y']
"""
return self.property_item("chromosome")
@property
def sid(self) -> np.ndarray:
"""SNP id of each SNP (variant).
Returns:
-------
numpy.ndarray
array of str
If needed, will cause a one-time read of the .bim file.
Example:
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.sid)
['sid1' 'sid2' 'sid3' 'sid4']
"""
return self.property_item("sid")
@property
def cm_position(self) -> np.ndarray:
"""Centimorgan position of each SNP (variant).
Returns:
-------
numpy.ndarray
array of float
0.0 represents a missing value.
If needed, will cause a one-time read of the .bim file.
Example:
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.cm_position)
[ 100.4 2000.5 4000.7 7000.9]
"""
return self.property_item("cm_position")
@property
def bp_position(self) -> np.ndarray:
"""Base-pair position of each SNP (variant).
Returns:
-------
numpy.ndarray
array of int
0 represents a missing value.
If needed, will cause a one-time read of the .bim file.
Example:
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.bp_position)
[ 1 100 1000 1004]
"""
return self.property_item("bp_position")
@property
def allele_1(self) -> np.ndarray:
"""First allele of each SNP (variant).
Returns:
-------
numpy.ndarray
array of str
If needed, will cause a one-time read of the .bim file.
Example:
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.allele_1)
['A' 'T' 'A' 'T']
"""
return self.property_item("allele_1")
@property
def allele_2(self) -> np.ndarray:
"""Second allele of each SNP (variant),.
Returns:
-------
numpy.ndarray
array of str
If needed, will cause a one-time read of the .bim file.
Example:
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.allele_2)
['A' 'C' 'C' 'G']
"""
return self.property_item("allele_2")
@property
def iid_count(self) -> np.ndarray:
"""Number of individuals (samples).
Returns:
-------
int
number of individuals
If needed, will cause a fast line-count of the .fam file.
Example:
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.iid_count)
3
"""
return self._count("fam")
@property
def sid_count(self) -> np.ndarray:
"""Number of SNPs (variants).
Returns:
-------
int
number of SNPs
If needed, will cause a fast line-count of the .bim file.
Example:
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.sid_count)
4
"""
return self._count("bim")
def _property_location(self, suffix):
if suffix == "fam":
return self._fam_location
assert suffix == "bim" # real assert
return self._bim_location
def _count(self, suffix):
count = self._counts[suffix]
if count is None:
location = self._property_location(suffix)
if open_bed._is_url(location):
# should not download twice from cloud
if suffix == "fam":
if self.property_item("iid") is None:
# ... unless user doesn't want iid
file_bytes = bytes(
url_to_bytes(location.geturl(), self.cloud_options),
)
count = _rawincount(BytesIO(file_bytes))
else:
count = len(self.iid)
elif suffix == "bim":
if self.property_item("sid") is None:
# ... unless user doesn't want sid
file_bytes = bytes(
url_to_bytes(location.geturl(), self.cloud_options),
)
count = _rawincount(BytesIO(file_bytes))
else:
count = len(self.sid)
else:
msg = "real assert"
raise ValueError(msg)
else:
with open(location, "rb") as f:
count = _rawincount(f)
self._counts[suffix] = count
return count
@staticmethod
def _check_file(filepointer):
magic_number = filepointer.read(2)
if magic_number != b"l\x1b":
msg = "Not a valid .bed file"
raise ValueError(msg)
mode = filepointer.read(1)
# Check if mode is either individual-major or SNP-major
if mode not in (b"\x00", b"\x01"):
msg = "Not a valid .bed file"
raise ValueError(msg)
return mode
def __del__(self) -> None:
self.__exit__()
def __enter__(self):
return self
def __exit__(self, *_):
pass
@staticmethod
def _array_properties_are_ok(val, order):
if order == "F":
return val.flags["F_CONTIGUOUS"]
assert order == "C" # real assert
return val.flags["C_CONTIGUOUS"]
@property
def shape(self):
"""Number of individuals (samples) and SNPs (variants).
Returns:
-------
(int, int)
number of individuals, number of SNPs
If needed, will cause a fast line-count of the .fam and .bim files.
Example:
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.shape)
(3, 4)
"""
return (self.iid_count, self.sid_count)
@staticmethod
def _split_index(index):
if not isinstance(index, tuple):
index = (None, index)
iid_index = open_bed._fix_up_index(index[0])
sid_index = open_bed._fix_up_index(index[1])
return iid_index, sid_index
@staticmethod
def _fix_up_index(index):
if index is None: # make a shortcut for None
return slice(None)
try: # If index is an int, return it in an array
index = index.__index__() # (see
# https://stackoverflow.com/questions/3501382/checking-whether-a-variable-is-an-integer-or-not)
return [index]
except Exception:
pass
return index
@staticmethod
def _write_fam_or_bim(base_filepath, properties, suffix, property_filepath) -> None:
assert suffix in {"fam", "bim"}, "real assert"
filepath = (
Path(property_filepath)
if property_filepath is not None
else base_filepath.parent / (base_filepath.stem + "." + suffix)
)
fam_bim_list = []
for key, mm in _meta_meta.items():
if mm.suffix == suffix:
assert len(fam_bim_list) == mm.column, "real assert"
fam_bim_list.append(properties[key])
sep = " " if suffix == "fam" else "\t"
with open(filepath, "w") as filepointer:
for index in range(len(fam_bim_list[0])):
filepointer.write(
sep.join(str(seq[index]) for seq in fam_bim_list) + "\n",
)
@staticmethod
def _fix_up_properties_array(input, dtype, missing_value, key):
if input is None:
return None
if len(input) == 0:
return np.zeros([0], dtype=dtype)
if not isinstance(input, np.ndarray):
return open_bed._fix_up_properties_array(
np.array(input), dtype, missing_value, key,
)
if len(input.shape) != 1:
msg = f"{key} should be one dimensional"
raise ValueError(msg)
do_missing_values = True
if np.issubdtype(input.dtype, np.floating) and np.issubdtype(dtype, int):
input[input != input] = missing_value
old_settings = np.seterr(invalid="warn")
try:
output = np.array(input, dtype=dtype)
finally:
np.seterr(**old_settings)
elif not np.issubdtype(input.dtype, dtype):
# This will convert, for example, numerical sids to string sids or
# floats that happen to be integers into ints,
# but there will be a warning generated.
old_settings = np.seterr(invalid="warn")
try:
output = np.array(input, dtype=dtype)
finally:
np.seterr(**old_settings)
else:
output = input
# Change NaN in input to correct missing value
if do_missing_values and np.issubdtype(input.dtype, np.floating):
output[input != input] = missing_value
return output
@staticmethod
def _fix_up_properties(properties, iid_count, sid_count, use_fill_sequence):
for key in properties:
if key not in _meta_meta:
msg = f"properties key '{key}' not known"
raise KeyError(msg)
count_dict = {"fam": iid_count, "bim": sid_count}
properties_dict = {}
for key, mm in _meta_meta.items():
count = count_dict[mm.suffix]
if key not in properties or (use_fill_sequence and properties[key] is None):
if use_fill_sequence:
output = mm.fill_sequence(key, count, mm.missing_value, mm.dtype)
else:
continue # Test coverage reaches this, but doesn't report it.
else:
output = open_bed._fix_up_properties_array(
properties[key], mm.dtype, mm.missing_value, key,
)
if output is not None:
if count is None:
count_dict[mm.suffix] = len(output)
elif count != len(output):
raise ValueError(
f"The length of override {key}, {len(output)}, should not "
+ "be different from the current "
+ f"{_count_name[mm.suffix]}, {count}",
)
properties_dict[key] = output
return properties_dict, count_dict
def _read_fam_or_bim(self, suffix) -> None:
property_location = self._property_location(suffix)
logging.info(f"Loading {suffix} file {property_location}")
count = self._counts[suffix]
delimiter = _delimiters[suffix]
if delimiter in {r"\s+"}:
delimiter = None
usecolsdict = {}
dtype_dict = {}
for key, mm in _meta_meta.items():
if mm.suffix is suffix and key not in self.properties_dict:
usecolsdict[key] = mm.column
dtype_dict[mm.column] = mm.dtype
assert list(usecolsdict.values()) == sorted(usecolsdict.values()) # real assert
assert len(usecolsdict) > 0 # real assert
if self._is_url(property_location):
file_bytes = bytes(
url_to_bytes(property_location.geturl(), self.cloud_options),
)
if len(file_bytes) == 0:
columns, row_count = [], 0
else: # note similar code below
columns, row_count = _read_csv(
BytesIO(file_bytes),
delimiter=delimiter,
dtype=dtype_dict,
usecols=usecolsdict.values(),
)
elif os.path.getsize(property_location) == 0:
columns, row_count = [], 0
else:
columns, row_count = _read_csv(
property_location,
delimiter=delimiter,
dtype=dtype_dict,
usecols=usecolsdict.values(),
)
if count is None:
self._counts[suffix] = row_count
elif count != row_count:
raise ValueError(
f"The number of lines in the *.{suffix} file, {row_count}, "
+ "should not be different from the current "
+ "f{_count_name[suffix]}, {count}",
)
for i, key in enumerate(usecolsdict.keys()):
mm = _meta_meta[key]
if row_count == 0:
output = np.array([], dtype=mm.dtype)
else:
output = columns[i]
if not np.issubdtype(output.dtype, mm.dtype):
output = np.array(output, dtype=mm.dtype)
self.properties_dict[key] = output
[docs]
def read_sparse(
self,
index: Optional[Any] = None,
dtype: Optional[Union[type, str]] = "float32",
batch_size: Optional[int] = None,
format: Optional[str] = "csc",
num_threads=None,
max_concurrent_requests=None,
max_chunk_bytes=None,
) -> (Union[sparse.csc_matrix, sparse.csr_matrix]) if sparse is not None else None: # type: ignore
"""Read genotype information into a :mod:`scipy.sparse` matrix. Sparse matrices
may be useful when the data is mostly zeros.
.. note::
This method requires :mod:`scipy`. Install `scipy` with:
.. code-block:: bash
pip install --upgrade bed-reader[sparse]
Parameters
----------
index:
An optional expression specifying the individuals (samples) and SNPs
(variants) to read. (See examples, below).
Defaults to ``None``, meaning read all.
(If index is a tuple, the first component indexes the individuals and the
second indexes
the SNPs. If it is not a tuple and not None, it indexes SNPs.)
dtype: {'float32' (default), 'float64', 'int8'}, optional
The desired data-type for the returned array.
batch_size: None or int, optional
Number of dense columns or rows to read at a time, internally.
Defaults to round(sqrt(total-number-of-columns-or-rows-to-read)).
format : {'csc','csr'}, optional
The desired format of the sparse matrix.
Defaults to ``csc`` (Compressed Sparse Column, which is SNP-major).
num_threads: None or int, optional
The number of threads with which to read data. Defaults to all available
processors.
Can also be set with :class:`open_bed` or these
environment variables (listed in priority order):
'PST_NUM_THREADS', 'NUM_THREADS', 'MKL_NUM_THREADS'.
max_concurrent_requests: None or int, optional
The maximum number of concurrent requests to make to the cloud storage
service. Defaults to 10.
max_chunk_bytes: None or int, optional
The maximum number of bytes to read in a single request to the cloud
storage service. Defaults to 8MB.
Returns
-------
a :class:`scipy.sparse.csc_matrix` (default) or :class:`scipy.sparse.csr_matrix`
Rows represent individuals (samples). Columns represent SNPs (variants).
For ``dtype`` 'float32' and 'float64', NaN indicates missing values.
For 'int8', -127 indicates missing values.
The memory used by the final sparse matrix is approximately:
# of non-zero values * (4 bytes + 1 byte (for int8))
For example, consider reading 1000 individuals (samples) x 50,000 SNPs (variants)
into csc format where the data is 97% sparse.
The memory used will be about 7.5 MB (1000 x 50,000 x 3% x 5 bytes).
This is 15% of the 50 MB needed by a dense matrix.
Internally, the function reads the data via small dense matrices.
For this example, by default, the function will read 1000 individuals x 224 SNPs
(because 224 * 224 is about 50,000).
The memory used by the small dense matrix is 1000 x 244 x 1 byte (for int8) = 0.224 MB.
You can set `batch_size`. Larger values will be faster.
Smaller values will use less memory.
For this example, we might want to set the `batch_size` to 5000. Then,
the memory used by the small dense matrix
would be 1000 x 5000 x 1 byte (for int8) = 5 MB,
similar to the 7.5 MB needed for the final sparse matrix.
Examples
--------
Read all data in a .bed file into a :class:`scipy.sparse.csc_matrix`.
The file has 10 individuals (samples) by 20 SNPs (variants).
All but eight values are 0.
.. doctest::
>>> # pip install bed-reader[samples,sparse] # if needed
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("sparse.bed")
>>> with open_bed(file_name) as bed:
... print(bed.shape) # doctest:+NORMALIZE_WHITESPACE +ELLIPSIS
... val_sparse = bed.read_sparse(dtype="int8")
(10, 20)
>>> print("Nonzero Values", val_sparse.data) # doctest:+NORMALIZE_WHITESPACE +ELLIPSIS
Nonzero Values [1 2 2 1 1 1 1 1]
To read selected individuals (samples) and/or SNPs (variants), set each part of
a :data:`numpy.s_` to an `int`, a list of `int`, a slice expression, or
a list of `bool`.
Negative integers count from the end of the list.
.. doctest::
>>> import numpy as np
>>> bed = open_bed(file_name)
>>> print("Nonzero Values", bed.read_sparse(np.s_[:,5], dtype="int8").data) # read the SNPs indexed by 5.
Nonzero Values [2]
>>> # read the SNPs indexed by 5, 4, and 0
>>> print("Nonzero Values", bed.read_sparse(np.s_[:,[5,4,0]], dtype="int8").data)
Nonzero Values [2 1]
>>> # read SNPs from 1 (inclusive) to 11 (exclusive)
>>> print("Nonzero Values", bed.read_sparse(np.s_[:,1:11], dtype="int8").data)
Nonzero Values [1 2 2 1 1]
>>> print(np.unique(bed.chromosome)) # print unique chrom values
['1' '5' 'Y']
>>> # read all SNPs in chrom 5
>>> print("Nonzero Values", bed.read_sparse(np.s_[:,bed.chromosome=='5'], dtype="int8").data)
Nonzero Values [1 2 2 1 1 1 1 1]
>>> # Read 1st individual (across all SNPs)
>>> print("Nonzero Values", bed.read_sparse(np.s_[0,:], dtype="int8").data)
Nonzero Values [2]
>>> print("Nonzero Values", bed.read_sparse(np.s_[::2,:], dtype="int8").data) # Read every 2nd individual
Nonzero Values [1 2 2 1 1]
>>> # read last and 2nd-to-last individuals and the 15th-from-the-last SNP
>>> print("Nonzero Values", bed.read_sparse(np.s_[[-1,-2],-15], dtype="int8").data)
Nonzero Values [2]
"""
if sparse is None:
raise ImportError(
"The function read_sparse() requires scipy. "
+ "Install it with 'pip install --upgrade bed-reader[sparse]'.",
)
iid_index_or_slice_etc, sid_index_or_slice_etc = self._split_index(index)
dtype = np.dtype(dtype)
# Similar code in read().
# Later happy with _iid_range and _sid_range or could it be done with
# allocation them?
if self._iid_range is None:
self._iid_range = np.arange(self.iid_count, dtype="intp")
if self._sid_range is None:
self._sid_range = np.arange(self.sid_count, dtype="intp")
iid_index = np.ascontiguousarray(
self._iid_range[iid_index_or_slice_etc],
dtype="intp",
)
sid_index = np.ascontiguousarray(
self._sid_range[sid_index_or_slice_etc], dtype="intp",
)
if (
len(iid_index) > np.iinfo(np.int32).max
or len(sid_index) > np.iinfo(np.int32).max
):
msg = (
"Too many Individuals or SNPs (variants) requested. "
"Maximum is {np.iinfo(np.int32).max}."
)
raise ValueError(
msg,
)
if batch_size is None:
batch_size = round(np.sqrt(len(sid_index)))
num_threads = get_num_threads(
self._num_threads if num_threads is None else num_threads,
)
max_concurrent_requests = get_max_concurrent_requests(
self._max_concurrent_requests
if max_concurrent_requests is None
else max_concurrent_requests,
)
max_chunk_bytes = get_max_chunk_bytes(
self._max_chunk_bytes if max_chunk_bytes is None else max_chunk_bytes,
)
if format == "csc":
order = "F"
indptr = np.zeros(len(sid_index) + 1, dtype=np.int32)
elif format == "csr":
order = "C"
indptr = np.zeros(len(iid_index) + 1, dtype=np.int32)
else:
msg = f"format '{format}' not known. Expected 'csc' or 'csr'."
raise ValueError(msg)
# We init data and indices with zero element arrays to set their dtype.
data = [np.empty(0, dtype=dtype)]
indices = [np.empty(0, dtype=np.int32)]
if self.iid_count > 0 and self.sid_count > 0:
reader, location_str, is_cloud = self._pick_reader(dtype)
if format == "csc":
val = np.zeros((len(iid_index), batch_size), order=order, dtype=dtype)
for batch_start in range(0, len(sid_index), batch_size):
batch_end = batch_start + batch_size
if batch_end > len(sid_index):
batch_end = len(sid_index)
del val
val = np.zeros(
(len(iid_index), batch_end - batch_start),
order=order,
dtype=dtype,
)
batch_slice = np.s_[batch_start:batch_end]
batch_index = sid_index[batch_slice]
if not is_cloud:
reader(
location_str,
self.cloud_options,
iid_count=self.iid_count,
sid_count=self.sid_count,
is_a1_counted=self.count_A1,
iid_index=iid_index,
sid_index=batch_index,
val=val,
num_threads=num_threads,
)
else:
reader(
location_str,
self.cloud_options,
iid_count=self.iid_count,
sid_count=self.sid_count,
is_a1_counted=self.count_A1,
iid_index=iid_index,
sid_index=batch_index,
val=val,
num_threads=num_threads,
max_concurrent_requests=max_concurrent_requests,
max_chunk_bytes=max_chunk_bytes,
)
self.sparsify(
val, order, iid_index, batch_slice, data, indices, indptr,
)
else:
assert format == "csr" # real assert
val = np.zeros((batch_size, len(sid_index)), order=order, dtype=dtype)
for batch_start in range(0, len(iid_index), batch_size):
batch_end = batch_start + batch_size
if batch_end > len(iid_index):
batch_end = len(iid_index)
del val
val = np.zeros(
(batch_end - batch_start, len(sid_index)),
order=order,
dtype=dtype,
)
batch_slice = np.s_[batch_start:batch_end]
batch_index = iid_index[batch_slice]
if not is_cloud:
reader(
location_str,
self.cloud_options,
iid_count=self.iid_count,
sid_count=self.sid_count,
is_a1_counted=self.count_A1,
iid_index=batch_index,
sid_index=sid_index,
val=val,
num_threads=num_threads,
)
else:
reader(
location_str,
self.cloud_options,
iid_count=self.iid_count,
sid_count=self.sid_count,
is_a1_counted=self.count_A1,
iid_index=batch_index,
sid_index=sid_index,
val=val,
num_threads=num_threads,
max_concurrent_requests=max_concurrent_requests,
max_chunk_bytes=max_chunk_bytes,
)
self.sparsify(
val, order, sid_index, batch_slice, data, indices, indptr,
)
data = np.concatenate(data)
indices = np.concatenate(indices)
if format == "csc":
return sparse.csc_matrix(
(data, indices, indptr), (len(iid_index), len(sid_index)),
)
assert format == "csr" # real assert
return sparse.csr_matrix(
(data, indices, indptr), (len(iid_index), len(sid_index)),
)
def sparsify(self, val, order, minor_index, batch_slice, data, indices, indptr) -> None:
flatten = np.ravel(val, order=order)
nz_indices = np.flatnonzero(flatten).astype(np.int32)
column_indexes = nz_indices // len(minor_index)
counts = np.bincount(
column_indexes, minlength=batch_slice.stop - batch_slice.start,
).astype(np.int32)
counts_with_initial = np.r_[
indptr[batch_slice.start : batch_slice.start + 1], counts,
]
data.append(flatten[nz_indices])
indices.append(np.mod(nz_indices, len(minor_index)))
indptr[1:][batch_slice] = np.cumsum(counts_with_initial)[1:]
def _read_csv(filepath, delimiter=None, dtype=None, usecols=None):
pattern = re.compile(r"^np\.\w+\((.+?)\)$")
# Prepare the usecols by ensuring it is a list of indices
usecols_indices = list(usecols)
transposed = np.loadtxt(
filepath,
dtype=np.str_,
delimiter=delimiter,
usecols=usecols_indices,
unpack=True,
)
if transposed.ndim == 1:
transposed = transposed.reshape(-1, 1)
row_count = transposed.shape[1] # because unpack=True
# Convert column lists to numpy arrays with the specified dtype
columns = []
for output_index, input_index in enumerate(usecols_indices):
col = transposed[output_index]
# work around numpy/python bug
if len(col) > 0 and pattern.fullmatch(col[0]):
col = np.array([pattern.fullmatch(x).group(1) for x in col])
# Find the dtype for this column
col_dtype = dtype.get(input_index, np.str_)
# Convert the column list to a numpy array with the specified dtype
columns.append(_convert_to_dtype(col, col_dtype))
return columns, row_count
def _convert_to_dtype(str_arr, dtype):
assert dtype in [np.str_, np.float32, np.int32] # real assert
if dtype == np.str_:
return str_arr
try:
new_arr = str_arr.astype(dtype)
except ValueError as e:
if dtype == np.float32:
raise
# for backwards compatibility, see if intermediate float helps int conversion
try:
assert dtype == np.int32 # real assert
float_arr = str_arr.astype(np.float32)
except ValueError:
raise e
new_arr = float_arr.astype(np.int32)
if not np.array_equal(new_arr, float_arr):
msg = f"invalid literal for int: '{str_arr[np.where(new_arr != float_arr)][:1]}')"
raise ValueError(
msg,
)
return new_arr
if __name__ == "__main__":
import pytest
logging.basicConfig(level=logging.INFO)
pytest.main(["--doctest-modules", __file__])