Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 23 additions & 161 deletions src/dcmstack/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pydicom.dataset import PrivateBlock
from pydicom.datadict import keyword_for_tag, tag_for_keyword, private_dictionaries
from pydicom.charset import decode_element
from nibabel.nicom import csareader
from csa_header import CsaHeader

from .dcmstack import DicomStack

Expand Down Expand Up @@ -89,195 +89,54 @@ def ignore_except(tag, name, ds):
the function which takes the DICOM element and returns a dictionary.'''


def simplify_csa_dict(csa_dict):
'''Simplify the result of nibabel.nicom.csareader.
def translate_csa(elem):
'''Translate an element containing a Siemens CSA header

Simplifues the result of csa_header.CsaHeader

Parameters
----------
csa_dict : dict
The result from nibabel.nicom.csareader
elem
The element we are translating

Returns
-------
result : dict
Result where the keys come from the 'tags' sub dictionary of `csa_dict`.
The values come from the 'items' within that tags sub sub dictionary.
If items has only one element it will be unpacked from the list.
Simpler key -> value mapping
'''
csa_dict = CsaHeader(elem.value).read()
if csa_dict is None:
return None

result = {}
for tag in sorted(csa_dict['tags']):
items = []
for item in csa_dict['tags'][tag]['items']:
if isinstance(item, bytes):
try:
item = item.decode()
except UnicodeDecodeError:
item = ""
items.append(item)
if len(items) == 0:
for key, elem in csa_dict.items():
val = elem['value']
if val is None:
continue
elif len(items) == 1:
result[tag] = items[0]
else:
result[tag] = items
# TODO: Address this in csa_header package?
if isinstance(val, list):
val = ['' if x == b'' else x for x in val]
result[key] = val
return result


def csa_image_trans_func(elem):
'''Function for translating the CSA image sub header.'''
return simplify_csa_dict(csareader.read(elem.value))


csa_image_trans = Translator('CsaImage',
pydicom.tag.Tag(0x29, 0x1010),
'SIEMENS CSA HEADER',
csa_image_trans_func)
translate_csa)
'''Translator for the CSA image sub header.'''


class PhoenixParseError(Exception):
def __init__(self, line):
'''Exception indicating a error parsing a line from the Phoenix
Protocol.
'''
self.line = line

def __str__(self):
return 'Unable to parse phoenix protocol line: %s' % self.line


def _parse_phoenix_line(line, str_delim='""'):
delim_len = len(str_delim)
#Handle most comments (not always when string literal involved)
comment_idx = line.find('#')
if comment_idx != -1:
#Check if the pound sign is in a string literal
if line[:comment_idx].count(str_delim) == 1:
if line[comment_idx:].find(str_delim) == -1:
raise PhoenixParseError(line)
else:
line = line[:comment_idx]

#Allow empty lines
if line.strip() == '':
return None

#Find the first equals sign and use that to split key/value
equals_idx = line.find('=')
if equals_idx == -1:
raise PhoenixParseError(line)
key = line[:equals_idx].strip()
val_str = line[equals_idx + 1:].strip()

#If there is a string literal, pull that out
if val_str.startswith(str_delim):
end_quote = val_str[delim_len:].find(str_delim) + delim_len
if end_quote == -1:
raise PhoenixParseError(line)
elif not end_quote == len(val_str) - delim_len:
#Make sure remainder is just comment
if not val_str[end_quote+delim_len:].strip().startswith('#'):
raise PhoenixParseError(line)

return (key, val_str[2:end_quote])

else: #Otherwise try to convert to an int or float
val = None
try:
val = int(val_str)
except ValueError:
pass
else:
return (key, val)

try:
val = int(val_str, 16)
except ValueError:
pass
else:
return (key, val)

try:
val = float(val_str)
except ValueError:
pass
else:
return (key, val)

raise PhoenixParseError(line)


def parse_phoenix_prot(prot_key, prot_val):
'''Parse the MrPheonixProtocol string.

Parameters
----------
prot_str : str
The 'MrPheonixProtocol' string from the CSA Series sub header.

Returns
-------
prot_dict : dict
Meta data pulled from the ASCCONV section.

Raises
------
PhoenixParseError : A line of the ASCCONV section could not be parsed.
'''
if prot_key == 'MrPhoenixProtocol':
str_delim = '""'
elif prot_key == 'MrProtocol':
str_delim = '"'
else:
raise ValueError('Unknown protocol key: %s' % prot_key)
ascconv_start = prot_val.find('### ASCCONV BEGIN ')
ascconv_end = prot_val.find('### ASCCONV END ###')
ascconv = prot_val[ascconv_start:ascconv_end].split('\n')[1:-1]

result = {}
for line in ascconv:
parse_result = _parse_phoenix_line(line, str_delim)
if parse_result:
result[parse_result[0]] = parse_result[1]

return result


def csa_series_trans_func(elem):
'''Function for parsing the CSA series sub header.'''
csa_dict = simplify_csa_dict(csareader.read(elem.value))

#If there is a phoenix protocol, parse it and dump it into the csa_dict
phx_src = None
if 'MrPhoenixProtocol' in csa_dict:
phx_src = 'MrPhoenixProtocol'
elif 'MrProtocol' in csa_dict:
phx_src = 'MrProtocol'

if not phx_src is None:
phoenix_dict = parse_phoenix_prot(phx_src, csa_dict[phx_src])
del csa_dict[phx_src]
for key, val in phoenix_dict.items():
new_key = '%s.%s' % ('MrPhoenixProtocol', key)
csa_dict[new_key] = val

return csa_dict


csa_series_trans = Translator('CsaSeries',
pydicom.tag.Tag(0x29, 0x1020),
'SIEMENS CSA HEADER',
csa_series_trans_func)
translate_csa)
'''Translator for parsing the CSA series sub header.'''


def tag_to_str(tag):
'''Convert a DICOM tag to a string representation using the group and
element hex values seprated by an underscore.'''
return '%#X_%#X' % (tag.group, tag.elem)
return '%#x_%#x' % (tag.group, tag.elem)


unpack_vr_map = {'SL' : 'i',
Expand Down Expand Up @@ -386,7 +245,10 @@ def _get_priv_name(self, tag: BaseTag, pblocks: List[PrivateBlock]):
elem = priv_info[2]
break
if elem == "Unknown":
elem = "%#X" % (tag.elem & 0xFF)
if creator.upper() == "UNKNOWN":
elem = tag_to_str(tag)
else:
elem = "%#x" % (tag.elem & 0xFF)
else:
elem = ''.join([t[0].upper() + t[1:] for t in elem.split()])
return f"{creator.upper().replace(' ', '_')}.{elem}"
Expand Down
1 change: 1 addition & 0 deletions src/dcmstack/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
'nibabel >= 5.3.1',
'pylibjpeg[all] ; python_version >= "3.8"',
'pint',
'csa_header @ git+https://github.com/open-dicom/csa_header.git',
]

# Extra requirements for building documentation and testing
Expand Down