import logging
import multiprocessing
import os
from dataclasses import dataclass
from io import BytesIO
from itertools import repeat, takewhile
from pathlib import Path
from typing import Any, List, Mapping, Optional, Union
from urllib.parse import ParseResult as UrlParseResult
from urllib.parse import urlparse
import numpy as np
try:
from scipy import sparse
except ImportError:
sparse = None
from .bed_reader import ( # type: ignore
check_file_cloud,
read_cloud_f32,
read_cloud_f64,
read_cloud_i8,
read_f32,
read_f64,
read_i8,
url_to_bytes,
)
# https://stackoverflow.com/questions/845058/how-to-get-line-count-of-a-large-file-cheaply-in-python
def _rawincount(f):
f.seek(0)
bufgen = takewhile(lambda x: x, (f.read(1024 * 1024) for _ in repeat(None)))
return sum(buf.count(b"\n") for buf in bufgen)
@dataclass
class _MetaMeta:
suffix: str
column: int
dtype: type
missing_value: object
fill_sequence: object
def _all_same(key, length, missing, dtype):
if np.issubdtype(dtype, np.str_):
dtype = f"<U{len(missing)}"
return np.full(length, missing, dtype=dtype)
def _sequence(key, length, missing, dtype):
if np.issubdtype(dtype, np.str_):
longest = len(f"{key}{length}")
dtype = f"<U{longest}"
return np.fromiter(
(f"{key}{i + 1}" for i in range(length)), dtype=dtype, count=length
)
_delimiters = {"fam": r"\s+", "bim": "\t"}
_count_name = {"fam": "iid_count", "bim": "sid_count"}
_meta_meta = {
# https://stackoverflow.com/questions/41921255/staticmethod-object-is-not-callable
"fid": _MetaMeta("fam", 0, np.str_, "0", _all_same),
"iid": _MetaMeta("fam", 1, np.str_, None, _sequence),
"father": _MetaMeta("fam", 2, np.str_, "0", _all_same),
"mother": _MetaMeta("fam", 3, np.str_, "0", _all_same),
"sex": _MetaMeta("fam", 4, np.int32, 0, _all_same),
"pheno": _MetaMeta("fam", 5, np.str_, "0", _all_same),
"chromosome": _MetaMeta("bim", 0, np.str_, "0", _all_same),
"sid": _MetaMeta("bim", 1, np.str_, None, _sequence),
"cm_position": _MetaMeta("bim", 2, np.float32, 0, _all_same),
"bp_position": _MetaMeta("bim", 3, np.int32, 0, _all_same),
"allele_1": _MetaMeta("bim", 4, np.str_, "A1", _all_same),
"allele_2": _MetaMeta("bim", 5, np.str_, "A2", _all_same),
}
def get_num_threads(num_threads=None):
if num_threads is not None:
return num_threads
if "PST_NUM_THREADS" in os.environ:
return int(os.environ["PST_NUM_THREADS"])
if "NUM_THREADS" in os.environ:
return int(os.environ["NUM_THREADS"])
if "MKL_NUM_THREADS" in os.environ:
return int(os.environ["MKL_NUM_THREADS"])
return multiprocessing.cpu_count()
def get_max_concurrent_requests(max_concurrent_requests=None):
if max_concurrent_requests is not None:
return max_concurrent_requests
return 10
def get_max_chunk_bytes(max_chunk_bytes=None):
if max_chunk_bytes is not None:
return max_chunk_bytes
return 8_000_000
[docs]
class open_bed:
"""
Open a PLINK .bed file, local or cloud, for reading.
Parameters
----------
location: pathlib.Path or str
File path or URL to the .bed file. See :doc:`cloud_urls` for details on cloud URLs.
iid_count: None or int, optional
Number of individuals (samples) in the .bed file.
The default (``iid_count=None``) finds the number
automatically by quickly scanning the .fam file.
sid_count: None or int, optional
Number of SNPs (variants) in the .bed file.
The default (``sid_count=None``) finds the number
automatically by quickly scanning the .bim file.
properties: dict, optional
A dictionary of any replacement properties. The default is an empty dictionary.
The keys of the dictionary are the names of the properties to replace.
The possible keys are:
"fid" (family id), "iid" (individual or sample id), "father" (father id),
"mother" (mother id), "sex", "pheno" (phenotype), "chromosome", "sid"
(SNP or variant id), "cm_position" (centimorgan position), "bp_position"
(base-pair position), "allele_1", "allele_2".
The values are replacement lists or arrays. A value can also be `None`,
meaning do not read or offer this property. See examples, below.
The list or array will be converted to a :class:`numpy.ndarray`
of the appropriate dtype, if necessary. Any :data:`numpy.nan` values
will converted to the appropriate missing value. The PLINK `.fam specification
<https://www.cog-genomics.org/plink2/formats#fam>`_
and `.bim specification <https://www.cog-genomics.org/plink2/formats#bim>`_
lists the dtypes and missing values for each property.
count_A1: bool, optional
True (default) to count the number of A1 alleles (the PLINK standard).
False to count the number of A2 alleles.
num_threads: None or int, optional
The number of threads with which to read data. Defaults to all available
processors.
Can also be set with these environment variables (listed in priority order):
'PST_NUM_THREADS', 'NUM_THREADS', 'MKL_NUM_THREADS'.
skip_format_check: bool, optional
False (default) to immediately check for expected starting bytes in
the .bed file. True to delay the check until (and if) data is read.
fam_location: pathlib.Path or str or URL, optional
Path to the file containing information about each individual (sample).
Defaults to replacing the .bed file’s suffix with .fam.
bim_location: pathlib.Path or str URL, optional
Path to the file containing information about each SNP (variant).
Defaults to replacing the .bed file’s suffix with .bim.
cloud_options: dict, optional
A dictionary of options for reading from cloud storage. The default is an empty.
max_concurrent_requests: None or int, optional
The maximum number of concurrent requests to make to the cloud storage service.
Defaults to 10.
max_chunk_bytes: None or int, optional
The maximum number of bytes to read in a single request to the cloud storage
service. Defaults to 8MB.
filepath: same as location
Deprecated. Use location instead.
fam_filepath: same as fam_location
Deprecated. Use fam_location instead.
bim_filepath: same as bim_location
Deprecated. Use bim_location instead.
Returns
-------
open_bed
an open_bed object
Examples
--------
Open a local file and list individual (sample) :attr:`iid` and SNP (variant) :attr:`sid`. Then, :meth:`read`
the whole file.
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> bed = open_bed(file_name)
>>> print(bed.iid)
['iid1' 'iid2' 'iid3']
>>> print(bed.sid)
['sid1' 'sid2' 'sid3' 'sid4']
>>> print(bed.read())
[[ 1. 0. nan 0.]
[ 2. 0. nan 2.]
[ 0. 1. 2. 0.]]
>>> del bed # optional: delete bed object
Open a cloud file with a non-default timeout.
Then, read the data for one SNP (variant)
at index position 2.
See :doc:`cloud_urls` for details on reading files from cloud storage.
.. doctest::
>>> import numpy as np
>>> with open_bed("https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/small.bed",
... cloud_options={"timeout": "10s"}) as bed:
... print(bed.read(np.s_[:,2]))
[[nan]
[nan]
[ 2.]]
With the local file, replace :attr:`iid`.
>>> bed = open_bed(file_name, properties={"iid":["sample1","sample2","sample3"]})
>>> print(bed.iid) # replaced
['sample1' 'sample2' 'sample3']
>>> print(bed.sid) # same as before
['sid1' 'sid2' 'sid3' 'sid4']
Give the number of individuals (samples) and SNPs (variants) so that the .fam and
.bim files need never be opened.
>>> with open_bed(file_name, iid_count=3, sid_count=4) as bed:
... print(bed.read())
[[ 1. 0. nan 0.]
[ 2. 0. nan 2.]
[ 0. 1. 2. 0.]]
Mark some properties as "don’t read or offer".
>>> bed = open_bed(file_name, properties={
... "father" : None, "mother" : None, "sex" : None, "pheno" : None,
... "allele_1" : None, "allele_2":None })
>>> print(bed.iid) # read from file
['iid1' 'iid2' 'iid3']
>>> print(bed.allele_2) # not read and not offered
None
See the :meth:`read` for details of reading batches via slicing and fancy indexing.
"""
def __init__(
self,
location: Union[str, Path, UrlParseResult],
iid_count: Optional[int] = None,
sid_count: Optional[int] = None,
properties: Mapping[str, List[Any]] = {},
count_A1: bool = True,
num_threads: Optional[int] = None,
skip_format_check: bool = False,
fam_location: Union[str, Path, UrlParseResult] = None,
bim_location: Union[str, Path, UrlParseResult] = None,
cloud_options: Mapping[str, str] = {},
max_concurrent_requests: Optional[int] = None,
max_chunk_bytes: Optional[int] = None,
# accept old keywords
filepath: Union[str, Path] = None,
fam_filepath: Union[str, Path] = None,
bim_filepath: Union[str, Path] = None,
):
location = self._combined(location, filepath, "location", "filepath")
fam_location = self._combined(
fam_location, fam_filepath, "fam_location", "fam_filepath"
)
bim_location = self._combined(
bim_location, bim_filepath, "bim_location", "bim_filepath"
)
self.location = self._path_or_url(location)
self.cloud_options = cloud_options
self.count_A1 = count_A1
self._num_threads = num_threads
self._max_concurrent_requests = max_concurrent_requests
self._max_chunk_bytes = max_chunk_bytes
self.skip_format_check = skip_format_check
self._fam_location = (
self._path_or_url(fam_location)
if fam_location is not None
else self._replace_extension(self.location, "fam")
)
self._bim_location = (
self._path_or_url(bim_location)
if bim_location is not None
else self._replace_extension(self.location, "bim")
)
self.properties_dict, self._counts = open_bed._fix_up_properties(
properties, iid_count, sid_count, use_fill_sequence=False
)
self._iid_range = None
self._sid_range = None
if not self.skip_format_check:
if self._is_url(self.location):
check_file_cloud(self.location.geturl(), self.cloud_options)
else:
with open(self.location, "rb") as filepointer:
self._check_file(filepointer)
# # its an error to set both location and filepath
# location = self._combined(location, filepath, "location", "filepath")
# fam_location = self._combined(fam_location, fam_filepath, "fam_location", "fam_filepath")
# bim_location = self._combined(bim_location, bim_filepath, "bim_location", "bim_filepath")
@staticmethod
def _combined(location, filepath, location_name, filepath_name):
if location is not None and filepath is not None:
raise ValueError(f"Cannot set both {location_name} and {filepath_name}")
# None, None is ok for now
return location if location is not None else filepath
@staticmethod
def _replace_extension(location, extension):
if open_bed._is_url(location):
# Split the path and change the extension
path, _ = os.path.splitext(location.path)
new_path = f"{path}.{extension}"
# Create a new ParseResult with the updated path
new_parse_result = UrlParseResult(
scheme=location.scheme,
netloc=location.netloc,
path=new_path,
params=location.params,
query=location.query,
fragment=location.fragment,
)
return new_parse_result
else:
assert isinstance(location, Path) # real assert
return location.parent / (location.stem + "." + extension)
@staticmethod
def _is_url(location):
return isinstance(location, UrlParseResult)
@staticmethod
def _path_or_url(input):
if isinstance(input, Path):
return input
if isinstance(input, UrlParseResult):
return input
assert isinstance(
input, str
), "Expected a string or Path object or UrlParseResult"
parsed = urlparse(input)
if parsed.scheme and "://" in input:
return parsed
else:
return Path(input)
[docs]
def read(
self,
index: Optional[Any] = None,
dtype: Optional[Union[type, str]] = "float32",
order: Optional[str] = "F",
force_python_only: Optional[bool] = False,
num_threads=None,
max_concurrent_requests=None,
max_chunk_bytes=None,
) -> np.ndarray:
"""
Read genotype information.
Parameters
----------
index:
An optional expression specifying the individuals (samples) and SNPs
(variants) to read. (See examples, below).
Defaults to ``None``, meaning read all.
(If index is a tuple, the first component indexes the individuals and the
second indexes
the SNPs. If it is not a tuple and not None, it indexes SNPs.)
dtype: {'float32' (default), 'float64', 'int8'}, optional
The desired data-type for the returned array.
order : {'F','C'}, optional
The desired memory layout for the returned array.
Defaults to ``F`` (Fortran order, which is SNP-major).
force_python_only: bool, optional
If False (default), uses the faster Rust code; otherwise it uses the slower
pure Python code.
num_threads: None or int, optional
The number of threads with which to read data. Defaults to all available
processors.
Can also be set with :class:`open_bed` or these
environment variables (listed in priority order):
'PST_NUM_THREADS', 'NUM_THREADS', 'MKL_NUM_THREADS'.
max_concurrent_requests: None or int, optional
The maximum number of concurrent requests to make to the cloud storage
service. Defaults to 10.
max_chunk_bytes: None or int, optional
The maximum number of bytes to read in a single request to the cloud
storage service. Defaults to 8MB.
Returns
-------
numpy.ndarray
2-D array containing values of 0, 1, 2, or missing
Rows represent individuals (samples). Columns represent SNPs (variants).
For ``dtype`` 'float32' and 'float64', NaN indicates missing values.
For 'int8', -127 indicates missing values.
Examples
--------
To read all data in a .bed file, set ``index`` to ``None``. This is the default.
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.read())
[[ 1. 0. nan 0.]
[ 2. 0. nan 2.]
[ 0. 1. 2. 0.]]
To read selected individuals (samples) and/or SNPs (variants), set each part of
a :data:`numpy.s_` to an `int`, a list of `int`, a slice expression, or
a list of `bool`.
Negative integers count from the end of the list.
.. doctest::
>>> import numpy as np
>>> bed = open_bed(file_name)
>>> print(bed.read(np.s_[:,2])) # read the SNPs indexed by 2.
[[nan]
[nan]
[ 2.]]
>>> print(bed.read(np.s_[:,[2,3,0]])) # read the SNPs indexed by 2, 3, and 0
[[nan 0. 1.]
[nan 2. 2.]
[ 2. 0. 0.]]
>>> # read SNPs from 1 (inclusive) to 4 (exclusive)
>>> print(bed.read(np.s_[:,1:4]))
[[ 0. nan 0.]
[ 0. nan 2.]
[ 1. 2. 0.]]
>>> print(np.unique(bed.chromosome)) # print unique chrom values
['1' '5' 'Y']
>>> print(bed.read(np.s_[:,bed.chromosome=='5'])) # read all SNPs in chrom 5
[[nan]
[nan]
[ 2.]]
>>> print(bed.read(np.s_[0,:])) # Read 1st individual (across all SNPs)
[[ 1. 0. nan 0.]]
>>> print(bed.read(np.s_[::2,:])) # Read every 2nd individual
[[ 1. 0. nan 0.]
[ 0. 1. 2. 0.]]
>>> #read last and 2nd-to-last individuals and the last SNPs
>>> print(bed.read(np.s_[[-1,-2],-1]))
[[0.]
[2.]]
You can give a dtype for the output.
.. doctest::
>>> print(bed.read(dtype='int8'))
[[ 1 0 -127 0]
[ 2 0 -127 2]
[ 0 1 2 0]]
>>> del bed # optional: delete bed object
"""
iid_index_or_slice_etc, sid_index_or_slice_etc = self._split_index(index)
dtype = np.dtype(dtype)
if order not in {"F", "C"}:
raise ValueError(f"order '{order}' not known, only 'F', 'C'")
# Later happy with _iid_range and _sid_range or could it be done with
# allocation them?
if self._iid_range is None:
self._iid_range = np.arange(self.iid_count, dtype="intp")
if self._sid_range is None:
self._sid_range = np.arange(self.sid_count, dtype="intp")
iid_index = np.ascontiguousarray(
self._iid_range[iid_index_or_slice_etc],
dtype="intp",
)
sid_index = np.ascontiguousarray(
self._sid_range[sid_index_or_slice_etc], dtype="intp"
)
if not force_python_only or open_bed._is_url(self.location):
num_threads = get_num_threads(
self._num_threads if num_threads is None else num_threads
)
max_concurrent_requests = get_max_concurrent_requests(
self._max_concurrent_requests
if max_concurrent_requests is None
else max_concurrent_requests
)
max_chunk_bytes = get_max_chunk_bytes(
self._max_chunk_bytes if max_chunk_bytes is None else max_chunk_bytes
)
val = np.zeros((len(iid_index), len(sid_index)), order=order, dtype=dtype)
if self.iid_count > 0 and self.sid_count > 0:
reader, location_str, is_cloud = self._pick_reader(dtype)
if not is_cloud:
reader(
location_str,
self.cloud_options,
iid_count=self.iid_count,
sid_count=self.sid_count,
is_a1_counted=self.count_A1,
iid_index=iid_index,
sid_index=sid_index,
val=val,
num_threads=num_threads,
)
else:
reader(
location_str,
self.cloud_options,
iid_count=self.iid_count,
sid_count=self.sid_count,
is_a1_counted=self.count_A1,
iid_index=iid_index,
sid_index=sid_index,
val=val,
num_threads=num_threads,
max_concurrent_requests=max_concurrent_requests,
max_chunk_bytes=max_chunk_bytes,
)
else:
if not self.count_A1:
byteZero = 0
byteThree = 2
else:
byteZero = 2
byteThree = 0
if dtype == np.int8:
missing = -127
else:
missing = np.nan
# An earlier version of this code had a way to read consecutive SNPs of code
# in one read. May want
# to add that ability back to the code.
# Also, note that reading with python will often result in
# non-contiguous memory
# logging.warn("using pure python plink parser (might be much slower!!)")
val = np.zeros(
((int(np.ceil(0.25 * self.iid_count)) * 4), len(sid_index)),
order=order,
dtype=dtype,
) # allocate it a little big
nbyte = int(np.ceil(0.25 * self.iid_count))
with open(self.location, "rb") as filepointer:
for SNPsIndex, bimIndex in enumerate(sid_index):
startbit = int(np.ceil(0.25 * self.iid_count) * bimIndex + 3)
filepointer.seek(startbit)
bytes = np.array(bytearray(filepointer.read(nbyte))).reshape(
(int(np.ceil(0.25 * self.iid_count)), 1), order="F"
)
val[3::4, SNPsIndex : SNPsIndex + 1] = byteZero
val[3::4, SNPsIndex : SNPsIndex + 1][bytes >= 64] = missing
val[3::4, SNPsIndex : SNPsIndex + 1][bytes >= 128] = 1
val[3::4, SNPsIndex : SNPsIndex + 1][bytes >= 192] = byteThree
bytes = np.mod(bytes, 64)
val[2::4, SNPsIndex : SNPsIndex + 1] = byteZero
val[2::4, SNPsIndex : SNPsIndex + 1][bytes >= 16] = missing
val[2::4, SNPsIndex : SNPsIndex + 1][bytes >= 32] = 1
val[2::4, SNPsIndex : SNPsIndex + 1][bytes >= 48] = byteThree
bytes = np.mod(bytes, 16)
val[1::4, SNPsIndex : SNPsIndex + 1] = byteZero
val[1::4, SNPsIndex : SNPsIndex + 1][bytes >= 4] = missing
val[1::4, SNPsIndex : SNPsIndex + 1][bytes >= 8] = 1
val[1::4, SNPsIndex : SNPsIndex + 1][bytes >= 12] = byteThree
bytes = np.mod(bytes, 4)
val[0::4, SNPsIndex : SNPsIndex + 1] = byteZero
val[0::4, SNPsIndex : SNPsIndex + 1][bytes >= 1] = missing
val[0::4, SNPsIndex : SNPsIndex + 1][bytes >= 2] = 1
val[0::4, SNPsIndex : SNPsIndex + 1][bytes >= 3] = byteThree
val = val[iid_index, :] # reorder or trim any extra allocation
assert val.dtype == np.dtype(dtype) # real assert
if not open_bed._array_properties_are_ok(val, order):
val = val.copy(order=order)
return val
def _pick_reader(self, dtype):
if dtype == np.int8:
file_reader = read_i8
cloud_reader = read_cloud_i8
elif dtype == np.float64:
file_reader = read_f64
cloud_reader = read_cloud_f64
elif dtype == np.float32:
file_reader = read_f32
cloud_reader = read_cloud_f32
else:
raise ValueError(
f"dtype '{dtype}' not known, only "
+ "'int8', 'float32', and 'float64' are allowed."
)
if open_bed._is_url(self.location):
reader = cloud_reader
location_str = self.location.geturl()
is_cloud = True
else:
reader = file_reader
location_str = str(self.location.as_posix())
is_cloud = False
return reader, location_str, is_cloud
def __str__(self) -> str:
return f"{self.__class__.__name__}('{self.location}',...)"
@property
def fid(self) -> np.ndarray:
"""
Family id of each individual (sample).
Returns
-------
numpy.ndarray
array of str
'0' represents a missing value.
If needed, will cause a one-time read of the .fam file.
Example
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.fid)
['fid1' 'fid1' 'fid2']
"""
return self.property_item("fid")
@property
def iid(self) -> np.ndarray:
"""
Individual id of each individual (sample).
Returns
-------
numpy.ndarray
array of str
If needed, will cause a one-time read of the .fam file.
Example
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.iid)
['iid1' 'iid2' 'iid3']
"""
return self.property_item("iid")
@property
def father(self) -> np.ndarray:
"""
Father id of each individual (sample).
Returns
-------
numpy.ndarray
array of str
'0' represents a missing value.
If needed, will cause a one-time read of the .fam file.
Example
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.father)
['iid23' 'iid23' 'iid22']
"""
return self.property_item("father")
@property
def mother(self) -> np.ndarray:
"""
Mother id of each individual (sample).
Returns
-------
numpy.ndarray
array of str
'0' represents a missing value.
If needed, will cause a one-time read of the .fam file.
Example
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.mother)
['iid34' 'iid34' 'iid33']
"""
return self.property_item("mother")
@property
def sex(self) -> np.ndarray:
"""
Sex of each individual (sample).
Returns
-------
numpy.ndarray
array of 0, 1, or 2
0 is unknown, 1 is male, 2 is female
If needed, will cause a one-time read of the .fam file.
Example
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.sex)
[1 2 0]
"""
return self.property_item("sex")
@property
def pheno(self) -> np.ndarray:
"""
A phenotype for each individual (sample)
(seldom used).
Returns
-------
numpy.ndarray
array of str
'0' may represent a missing value.
If needed, will cause a one-time read of the .fam file.
Example
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.pheno)
['red' 'red' 'blue']
"""
return self.property_item("pheno")
@property
def properties(self) -> Mapping[str, np.array]:
"""
All the properties returned as a dictionary.
Returns
-------
dict
all the properties
The keys of the dictionary are the names of the properties, namely:
"fid" (family id), "iid" (individual or sample id), "father" (father id),
"mother" (mother id), "sex", "pheno" (phenotype), "chromosome", "sid"
(SNP or variant id), "cm_position" (centimorgan position), "bp_position"
(base-pair position), "allele_1", "allele_2".
The values are :class:`numpy.ndarray`.
If needed, will cause a one-time read of the .fam and .bim file.
Example
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(len(bed.properties)) #length of dict
12
"""
for key in _meta_meta:
self.property_item(key)
return self.properties_dict
[docs]
def property_item(self, name: str) -> np.ndarray:
"""
Retrieve one property by name.
Returns
-------
numpy.ndarray
a property value
The name is one of these:
"fid" (family id), "iid" (individual or sample id), "father" (father id),
"mother" (mother id), "sex", "pheno" (phenotype), "chromosome", "sid"
(SNP or variant id), "cm_position" (centimorgan position), "bp_position"
(base-pair position), "allele_1", "allele_2".
If needed, will cause a one-time read of the .fam or .bim file.
Example
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.property_item('chromosome'))
['1' '1' '5' 'Y']
"""
if name not in self.properties_dict:
mm = _meta_meta[name]
self._read_fam_or_bim(suffix=mm.suffix)
return self.properties_dict[name]
@property
def chromosome(self) -> np.ndarray:
"""
Chromosome of each SNP (variant)
Returns
-------
numpy.ndarray
array of str
'0' represents a missing value.
If needed, will cause a one-time read of the .bim file.
Example
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.chromosome)
['1' '1' '5' 'Y']
"""
return self.property_item("chromosome")
@property
def sid(self) -> np.ndarray:
"""
SNP id of each SNP (variant).
Returns
-------
numpy.ndarray
array of str
If needed, will cause a one-time read of the .bim file.
Example
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.sid)
['sid1' 'sid2' 'sid3' 'sid4']
"""
return self.property_item("sid")
@property
def cm_position(self) -> np.ndarray:
"""
Centimorgan position of each SNP (variant).
Returns
-------
numpy.ndarray
array of float
0.0 represents a missing value.
If needed, will cause a one-time read of the .bim file.
Example
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.cm_position)
[ 100.4 2000.5 4000.7 7000.9]
"""
return self.property_item("cm_position")
@property
def bp_position(self) -> np.ndarray:
"""
Base-pair position of each SNP (variant).
Returns
-------
numpy.ndarray
array of int
0 represents a missing value.
If needed, will cause a one-time read of the .bim file.
Example
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.bp_position)
[ 1 100 1000 1004]
"""
return self.property_item("bp_position")
@property
def allele_1(self) -> np.ndarray:
"""
First allele of each SNP (variant).
Returns
-------
numpy.ndarray
array of str
If needed, will cause a one-time read of the .bim file.
Example
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.allele_1)
['A' 'T' 'A' 'T']
"""
return self.property_item("allele_1")
@property
def allele_2(self) -> np.ndarray:
"""
Second allele of each SNP (variant),
Returns
-------
numpy.ndarray
array of str
If needed, will cause a one-time read of the .bim file.
Example
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.allele_2)
['A' 'C' 'C' 'G']
"""
return self.property_item("allele_2")
@property
def iid_count(self) -> np.ndarray:
"""
Number of individuals (samples).
Returns
-------
int
number of individuals
If needed, will cause a fast line-count of the .fam file.
Example
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.iid_count)
3
"""
return self._count("fam")
@property
def sid_count(self) -> np.ndarray:
"""
Number of SNPs (variants).
Returns
-------
int
number of SNPs
If needed, will cause a fast line-count of the .bim file.
Example
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.sid_count)
4
"""
return self._count("bim")
def _property_location(self, suffix):
if suffix == "fam":
return self._fam_location
else:
assert suffix == "bim" # real assert
return self._bim_location
def _count(self, suffix):
count = self._counts[suffix]
if count is None:
location = self._property_location(suffix)
if open_bed._is_url(location):
# should not download twice from cloud
if suffix == "fam":
if self.property_item("iid") is None:
# ... unless user doesn't want iid
file_bytes = bytes(
url_to_bytes(location.geturl(), self.cloud_options)
)
count = _rawincount(BytesIO(file_bytes))
else:
count = len(self.iid)
elif suffix == "bim":
if self.property_item("sid") is None:
# ... unless user doesn't want sid
file_bytes = bytes(
url_to_bytes(location.geturl(), self.cloud_options)
)
count = _rawincount(BytesIO(file_bytes))
else:
count = len(self.sid)
else:
raise ValueError("real assert")
else:
count = _rawincount(open(location, "rb"))
self._counts[suffix] = count
return count
@staticmethod
def _check_file(filepointer):
mode = filepointer.read(2)
if mode != b"l\x1b":
raise ValueError("Not a valid .bed file")
mode = filepointer.read(1) # \x01 = SNP major \x00 = individual major
if mode != b"\x01":
raise ValueError("only SNP-major is implemented")
def __del__(self):
self.__exit__()
def __enter__(self):
return self
def __exit__(self, *_):
pass
@staticmethod
def _array_properties_are_ok(val, order):
if order == "F":
return val.flags["F_CONTIGUOUS"]
else:
assert order == "C" # real assert
return val.flags["C_CONTIGUOUS"]
@property
def shape(self):
"""
Number of individuals (samples) and SNPs (variants).
Returns
-------
(int, int)
number of individuals, number of SNPs
If needed, will cause a fast line-count of the .fam and .bim files.
Example
-------
.. doctest::
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("small.bed")
>>> with open_bed(file_name) as bed:
... print(bed.shape)
(3, 4)
"""
return (self.iid_count, self.sid_count)
@staticmethod
def _split_index(index):
if not isinstance(index, tuple):
index = (None, index)
iid_index = open_bed._fix_up_index(index[0])
sid_index = open_bed._fix_up_index(index[1])
return iid_index, sid_index
@staticmethod
def _fix_up_index(index):
if index is None: # make a shortcut for None
return slice(None)
try: # If index is an int, return it in an array
index = index.__index__() # (see
# https://stackoverflow.com/questions/3501382/checking-whether-a-variable-is-an-integer-or-not)
return [index]
except Exception:
pass
return index
@staticmethod
def _write_fam_or_bim(base_filepath, properties, suffix, property_filepath):
assert suffix in {"fam", "bim"}, "real assert"
filepath = (
Path(property_filepath)
if property_filepath is not None
else base_filepath.parent / (base_filepath.stem + "." + suffix)
)
fam_bim_list = []
for key, mm in _meta_meta.items():
if mm.suffix == suffix:
assert len(fam_bim_list) == mm.column, "real assert"
fam_bim_list.append(properties[key])
sep = " " if suffix == "fam" else "\t"
with open(filepath, "w") as filepointer:
for index in range(len(fam_bim_list[0])):
filepointer.write(
sep.join(str(seq[index]) for seq in fam_bim_list) + "\n"
)
@staticmethod
def _fix_up_properties_array(input, dtype, missing_value, key):
if input is None:
return None
if len(input) == 0:
return np.zeros([0], dtype=dtype)
if not isinstance(input, np.ndarray):
return open_bed._fix_up_properties_array(
np.array(input), dtype, missing_value, key
)
if len(input.shape) != 1:
raise ValueError(f"{key} should be one dimensional")
if not np.issubdtype(input.dtype, dtype):
# This will convert, for example, numerical sids to string sids or
# floats that happen to be integers into ints,
# but there will be a warning generated.
output = np.array(input, dtype=dtype)
else:
output = input
# Change NaN in input to correct missing value
if np.issubdtype(input.dtype, np.floating):
output[input != input] = missing_value
return output
@staticmethod
def _fix_up_properties(properties, iid_count, sid_count, use_fill_sequence):
for key in properties:
if key not in _meta_meta:
raise KeyError(f"properties key '{key}' not known")
count_dict = {"fam": iid_count, "bim": sid_count}
properties_dict = {}
for key, mm in _meta_meta.items():
count = count_dict[mm.suffix]
if key not in properties or (use_fill_sequence and properties[key] is None):
if use_fill_sequence:
output = mm.fill_sequence(key, count, mm.missing_value, mm.dtype)
else:
continue # Test coverage reaches this, but doesn't report it.
else:
output = open_bed._fix_up_properties_array(
properties[key], mm.dtype, mm.missing_value, key
)
if output is not None:
if count is None:
count_dict[mm.suffix] = len(output)
else:
if count != len(output):
raise ValueError(
f"The length of override {key}, {len(output)}, should not "
+ "be different from the current "
+ f"{_count_name[mm.suffix]}, {count}"
)
properties_dict[key] = output
return properties_dict, count_dict
def _read_fam_or_bim(self, suffix):
property_location = self._property_location(suffix)
logging.info("Loading {0} file {1}".format(suffix, property_location))
count = self._counts[suffix]
delimiter = _delimiters[suffix]
if delimiter in {r"\s+"}:
delimiter = None
usecolsdict = {}
dtype_dict = {}
for key, mm in _meta_meta.items():
if mm.suffix is suffix and key not in self.properties_dict:
usecolsdict[key] = mm.column
dtype_dict[mm.column] = mm.dtype
assert list(usecolsdict.values()) == sorted(usecolsdict.values()) # real assert
assert len(usecolsdict) > 0 # real assert
if self._is_url(property_location):
file_bytes = bytes(
url_to_bytes(property_location.geturl(), self.cloud_options)
)
if len(file_bytes) == 0:
columns, row_count = [], 0
else: # note similar code below
columns, row_count = _read_csv(
BytesIO(file_bytes),
delimiter=delimiter,
dtype=dtype_dict,
usecols=usecolsdict.values(),
)
else:
if os.path.getsize(property_location) == 0:
columns, row_count = [], 0
else:
columns, row_count = _read_csv(
property_location,
delimiter=delimiter,
dtype=dtype_dict,
usecols=usecolsdict.values(),
)
if count is None:
self._counts[suffix] = row_count
else:
if count != row_count:
raise ValueError(
f"The number of lines in the *.{suffix} file, {row_count}, "
+ "should not be different from the current "
+ "f{_count_name[suffix]}, {count}"
)
for i, key in enumerate(usecolsdict.keys()):
mm = _meta_meta[key]
if row_count == 0:
output = np.array([], dtype=mm.dtype)
else:
output = columns[i]
if not np.issubdtype(output.dtype, mm.dtype):
output = np.array(output, dtype=mm.dtype)
self.properties_dict[key] = output
[docs]
def read_sparse(
self,
index: Optional[Any] = None,
dtype: Optional[Union[type, str]] = "float32",
batch_size: Optional[int] = None,
format: Optional[str] = "csc",
num_threads=None,
max_concurrent_requests=None,
max_chunk_bytes=None,
) -> (Union[sparse.csc_matrix, sparse.csr_matrix]) if sparse is not None else None:
"""
Read genotype information into a :mod:`scipy.sparse` matrix. Sparse matrices
may be useful when the data is mostly zeros.
.. note::
This method requires :mod:`scipy`. Install `scipy` with:
.. code-block:: bash
pip install --upgrade bed-reader[sparse]
Parameters
----------
index:
An optional expression specifying the individuals (samples) and SNPs
(variants) to read. (See examples, below).
Defaults to ``None``, meaning read all.
(If index is a tuple, the first component indexes the individuals and the
second indexes
the SNPs. If it is not a tuple and not None, it indexes SNPs.)
dtype: {'float32' (default), 'float64', 'int8'}, optional
The desired data-type for the returned array.
batch_size: None or int, optional
Number of dense columns or rows to read at a time, internally.
Defaults to round(sqrt(total-number-of-columns-or-rows-to-read)).
format : {'csc','csr'}, optional
The desired format of the sparse matrix.
Defaults to ``csc`` (Compressed Sparse Column, which is SNP-major).
num_threads: None or int, optional
The number of threads with which to read data. Defaults to all available
processors.
Can also be set with :class:`open_bed` or these
environment variables (listed in priority order):
'PST_NUM_THREADS', 'NUM_THREADS', 'MKL_NUM_THREADS'.
max_concurrent_requests: None or int, optional
The maximum number of concurrent requests to make to the cloud storage
service. Defaults to 10.
max_chunk_bytes: None or int, optional
The maximum number of bytes to read in a single request to the cloud
storage service. Defaults to 8MB.
Returns
-------
a :class:`scipy.sparse.csc_matrix` (default) or :class:`scipy.sparse.csr_matrix`
Rows represent individuals (samples). Columns represent SNPs (variants).
For ``dtype`` 'float32' and 'float64', NaN indicates missing values.
For 'int8', -127 indicates missing values.
The memory used by the final sparse matrix is approximately:
# of non-zero values * (4 bytes + 1 byte (for int8))
For example, consider reading 1000 individuals (samples) x 50,000 SNPs (variants)
into csc format where the data is 97% sparse.
The memory used will be about 7.5 MB (1000 x 50,000 x 3% x 5 bytes).
This is 15% of the 50 MB needed by a dense matrix.
Internally, the function reads the data via small dense matrices.
For this example, by default, the function will read 1000 individuals x 224 SNPs
(because 224 * 224 is about 50,000).
The memory used by the small dense matrix is 1000 x 244 x 1 byte (for int8) = 0.224 MB.
You can set `batch_size`. Larger values will be faster.
Smaller values will use less memory.
For this example, we might want to set the `batch_size` to 5000. Then,
the memory used by the small dense matrix
would be 1000 x 5000 x 1 byte (for int8) = 5 MB,
similar to the 7.5 MB needed for the final sparse matrix.
Examples
--------
Read all data in a .bed file into a :class:`scipy.sparse.csc_matrix`.
The file has 10 individuals (samples) by 20 SNPs (variants).
All but eight values are 0.
.. doctest::
>>> # pip install bed-reader[samples,sparse] # if needed
>>> from bed_reader import open_bed, sample_file
>>>
>>> file_name = sample_file("sparse.bed")
>>> with open_bed(file_name) as bed:
... print(bed.shape)
... val_sparse = bed.read_sparse(dtype="int8")
... print(val_sparse) # doctest:+NORMALIZE_WHITESPACE
(10, 20)
(8, 4) 1
(8, 5) 2
(0, 8) 2
(4, 9) 1
(7, 9) 1
(5, 11) 1
(2, 12) 1
(3, 12) 1
To read selected individuals (samples) and/or SNPs (variants), set each part of
a :data:`numpy.s_` to an `int`, a list of `int`, a slice expression, or
a list of `bool`.
Negative integers count from the end of the list.
.. doctest::
>>> import numpy as np
>>> bed = open_bed(file_name)
>>> print(bed.read_sparse(np.s_[:,5], dtype="int8")) # read the SNPs indexed by 5. # doctest:+NORMALIZE_WHITESPACE
(8, 0) 2
>>> # read the SNPs indexed by 5, 4, and 0
>>> print(bed.read_sparse(np.s_[:,[5,4,0]], dtype="int8")) # doctest:+NORMALIZE_WHITESPACE
(8, 0) 2
(8, 1) 1
>>> # read SNPs from 1 (inclusive) to 11 (exclusive)
>>> print(bed.read_sparse(np.s_[:,1:11], dtype="int8")) # doctest:+NORMALIZE_WHITESPACE
(8, 3) 1
(8, 4) 2
(0, 7) 2
(4, 8) 1
(7, 8) 1
>>> print(np.unique(bed.chromosome)) # print unique chrom values
['1' '5' 'Y']
>>> # read all SNPs in chrom 5
>>> print(bed.read_sparse(np.s_[:,bed.chromosome=='5'], dtype="int8")) # doctest:+NORMALIZE_WHITESPACE
(8, 0) 1
(8, 1) 2
(0, 4) 2
(4, 5) 1
(7, 5) 1
(5, 7) 1
(2, 8) 1
(3, 8) 1
>>> # Read 1st individual (across all SNPs)
>>> print(bed.read_sparse(np.s_[0,:], dtype="int8")) # doctest:+NORMALIZE_WHITESPACE
(0, 8) 2
>>> print(bed.read_sparse(np.s_[::2,:], dtype="int8")) # Read every 2nd individual # doctest:+NORMALIZE_WHITESPACE
(4, 4) 1
(4, 5) 2
(0, 8) 2
(2, 9) 1
(1, 12) 1
>>> # read last and 2nd-to-last individuals and the 15th-from-the-last SNP
>>> print(bed.read_sparse(np.s_[[-1,-2],-15], dtype="int8")) # doctest:+NORMALIZE_WHITESPACE
(1, 0) 2
"""
if sparse is None:
raise ImportError(
"The function read_sparse() requires scipy. "
+ "Install it with 'pip install --upgrade bed-reader[sparse]'."
)
iid_index_or_slice_etc, sid_index_or_slice_etc = self._split_index(index)
dtype = np.dtype(dtype)
# Similar code in read().
# Later happy with _iid_range and _sid_range or could it be done with
# allocation them?
if self._iid_range is None:
self._iid_range = np.arange(self.iid_count, dtype="intp")
if self._sid_range is None:
self._sid_range = np.arange(self.sid_count, dtype="intp")
iid_index = np.ascontiguousarray(
self._iid_range[iid_index_or_slice_etc],
dtype="intp",
)
sid_index = np.ascontiguousarray(
self._sid_range[sid_index_or_slice_etc], dtype="intp"
)
if (
len(iid_index) > np.iinfo(np.int32).max
or len(sid_index) > np.iinfo(np.int32).max
):
raise ValueError(
"Too (many Individuals or SNPs (variants) requested. Maximum is {np.iinfo(np.int32).max}."
)
if batch_size is None:
batch_size = round(np.sqrt(len(sid_index)))
num_threads = get_num_threads(
self._num_threads if num_threads is None else num_threads
)
max_concurrent_requests = get_max_concurrent_requests(
self._max_concurrent_requests
if max_concurrent_requests is None
else max_concurrent_requests
)
max_chunk_bytes = get_max_chunk_bytes(
self._max_chunk_bytes if max_chunk_bytes is None else max_chunk_bytes
)
if format == "csc":
order = "F"
indptr = np.zeros(len(sid_index) + 1, dtype=np.int32)
elif format == "csr":
order = "C"
indptr = np.zeros(len(iid_index) + 1, dtype=np.int32)
else:
raise ValueError(f"format '{format}' not known. Expected 'csc' or 'csr'.")
# We init data and indices with zero element arrays to set their dtype.
data = [np.empty(0, dtype=dtype)]
indices = [np.empty(0, dtype=np.int32)]
if self.iid_count > 0 and self.sid_count > 0:
reader, location_str, is_cloud = self._pick_reader(dtype)
if format == "csc":
val = np.zeros((len(iid_index), batch_size), order=order, dtype=dtype)
for batch_start in range(0, len(sid_index), batch_size):
batch_end = batch_start + batch_size
if batch_end > len(sid_index):
batch_end = len(sid_index)
del val
val = np.zeros(
(len(iid_index), batch_end - batch_start),
order=order,
dtype=dtype,
)
batch_slice = np.s_[batch_start:batch_end]
batch_index = sid_index[batch_slice]
if not is_cloud:
reader(
location_str,
self.cloud_options,
iid_count=self.iid_count,
sid_count=self.sid_count,
is_a1_counted=self.count_A1,
iid_index=iid_index,
sid_index=batch_index,
val=val,
num_threads=num_threads,
)
else:
reader(
location_str,
self.cloud_options,
iid_count=self.iid_count,
sid_count=self.sid_count,
is_a1_counted=self.count_A1,
iid_index=iid_index,
sid_index=batch_index,
val=val,
num_threads=num_threads,
max_concurrent_requests=max_concurrent_requests,
max_chunk_bytes=max_chunk_bytes,
)
self.sparsify(
val, order, iid_index, batch_slice, data, indices, indptr
)
else:
assert format == "csr" # real assert
val = np.zeros((batch_size, len(sid_index)), order=order, dtype=dtype)
for batch_start in range(0, len(iid_index), batch_size):
batch_end = batch_start + batch_size
if batch_end > len(iid_index):
batch_end = len(iid_index)
del val
val = np.zeros(
(batch_end - batch_start, len(sid_index)),
order=order,
dtype=dtype,
)
batch_slice = np.s_[batch_start:batch_end]
batch_index = iid_index[batch_slice]
if not is_cloud:
reader(
location_str,
self.cloud_options,
iid_count=self.iid_count,
sid_count=self.sid_count,
is_a1_counted=self.count_A1,
iid_index=batch_index,
sid_index=sid_index,
val=val,
num_threads=num_threads,
)
else:
reader(
location_str,
self.cloud_options,
iid_count=self.iid_count,
sid_count=self.sid_count,
is_a1_counted=self.count_A1,
iid_index=batch_index,
sid_index=sid_index,
val=val,
num_threads=num_threads,
max_concurrent_requests=max_concurrent_requests,
max_chunk_bytes=max_chunk_bytes,
)
self.sparsify(
val, order, sid_index, batch_slice, data, indices, indptr
)
data = np.concatenate(data)
indices = np.concatenate(indices)
if format == "csc":
return sparse.csc_matrix(
(data, indices, indptr), (len(iid_index), len(sid_index))
)
else:
assert format == "csr" # real assert
return sparse.csr_matrix(
(data, indices, indptr), (len(iid_index), len(sid_index))
)
def sparsify(self, val, order, minor_index, batch_slice, data, indices, indptr):
flatten = np.ravel(val, order=order)
nz_indices = np.flatnonzero(flatten).astype(np.int32)
column_indexes = nz_indices // len(minor_index)
counts = np.bincount(
column_indexes, minlength=batch_slice.stop - batch_slice.start
).astype(np.int32)
counts_with_initial = np.r_[
indptr[batch_slice.start : batch_slice.start + 1], counts
]
data.append(flatten[nz_indices])
indices.append(np.mod(nz_indices, len(minor_index)))
indptr[1:][batch_slice] = np.cumsum(counts_with_initial)[1:]
def _read_csv(filepath, delimiter=None, dtype=None, usecols=None):
# Prepare the usecols by ensuring it is a list of indices
usecols_indices = list(usecols)
transposed = np.loadtxt(
filepath,
dtype=np.str_,
delimiter=delimiter,
usecols=usecols_indices,
unpack=True,
)
if transposed.ndim == 1:
transposed = transposed.reshape(-1, 1)
row_count = transposed.shape[1] # because unpack=True
# Convert column lists to numpy arrays with the specified dtype
columns = []
for output_index, input_index in enumerate(usecols_indices):
col = transposed[output_index]
# Find the dtype for this column
col_dtype = dtype.get(input_index, np.str_)
# Convert the column list to a numpy array with the specified dtype
columns.append(_convert_to_dtype(col, col_dtype))
return columns, row_count
def _convert_to_dtype(str_arr, dtype):
assert dtype in [np.str_, np.float32, np.int32] # real assert
if dtype == np.str_:
return str_arr
try:
new_arr = str_arr.astype(dtype)
except ValueError as e:
if dtype == np.float32:
raise e
# for backwards compatibility, see if intermediate float helps int conversion
try:
assert dtype == np.int32 # real assert
float_arr = str_arr.astype(np.float32)
except ValueError:
raise e
new_arr = float_arr.astype(np.int32)
if not np.array_equal(new_arr, float_arr):
raise ValueError(
f"invalid literal for int: '{str_arr[np.where(new_arr != float_arr)][:1]}')"
)
return new_arr
if __name__ == "__main__":
import pytest
logging.basicConfig(level=logging.INFO)
pytest.main(["--doctest-modules", __file__])