from lxml import etree
import csv
import os
def extract_xml_element_text_to_csv_streaming(
xml_file_path,
csv_file_path,
tag_names=None,
element_names_to_extract=None,
wildcard_namespace_tags=True
):
"""
Extracts text content from specified XML elements in a streaming fashion
and writes them to a CSV file. Designed for low memory utilization with huge XML files.
Handles XML namespaces for tags and multiple target tags.
Args:
xml_file_path (str): The path to the input XML file.
csv_file_path (str): The path to the output CSV file.
tag_names (str or list): The XML tag name(s) of the "parent" elements
from which to extract text from their children.
Can be a single string (e.g., 'person') or a list of strings
(e.g., ['person', 'organization']).
Defaults to ['person'] if None.
Use '{uri}tag_name' for explicit namespace, or rely on wildcard.
element_names_to_extract (list): A list of local names of child elements whose
text content needs to be fetched (e.g., ['name', 'city', 'description']).
If None, it defaults to ['name', 'city', 'school'].
The function will attempt to find these elements regardless of their namespace.
wildcard_namespace_tags (bool): If True and a tag in `tag_names` doesn't have an explicit
'{uri}' prefix, it will be treated as `{*}` prefixed,
matching elements in any namespace or no namespace. Defaults to True.
Returns:
bool: True if the element texts were successfully extracted and written to CSV, False otherwise.
"""
if not os.path.exists(xml_file_path):
print(f"Error: XML file not found at '{xml_file_path}'")
return False
if tag_names is None:
tag_names = ['person']
elif isinstance(tag_names, str):
tag_names = [tag_names]
elif not isinstance(tag_names, list) or not all(isinstance(tag, str) for tag in tag_names):
print("Error: 'tag_names' must be a string or a list of strings.")
return False
if not tag_names:
print("Warning: 'tag_names' is empty. No elements will be processed.")
return True
target_tags_for_iterparse = []
for tag in tag_names:
if '{' in tag and '}' in tag:
target_tags_for_iterparse.append(tag)
elif wildcard_namespace_tags:
target_tags_for_iterparse.append(f"{{*}}{tag}")
else:
target_tags_for_iterparse.append(tag)
if element_names_to_extract is None:
element_names_to_extract = ['name', 'city', 'school']
elif not isinstance(element_names_to_extract, list) or not all(isinstance(elem_name, str) for elem_name in element_names_to_extract):
print("Error: 'element_names_to_extract' must be a list of strings (local names).")
return False
if not element_names_to_extract:
print("Warning: 'element_names_to_extract' is empty. No element texts will be extracted.")
return True
print(f"Starting extraction from '{xml_file_path}' to '{csv_file_path}' for tags: {tag_names}...")
print(f"Targeting child element texts (local names): {element_names_to_extract}")
try:
with open(csv_file_path, 'w', newline='', encoding='utf-8') as csvfile:
csv_writer = csv.writer(csvfile)
csv_writer.writerow(element_names_to_extract)
context = etree.iterparse(xml_file_path, events=('end',), tag=target_tags_for_iterparse)
for event, parent_elem in context:
row_data = []
found_element_texts = {}
for child_elem in parent_elem:
local_child_name = etree.QName(child_elem.tag).localname
found_element_texts[local_child_name] = (child_elem.text or '').strip()
for desired_elem_local_name in element_names_to_extract:
row_data.append(found_element_texts.get(desired_elem_local_name, ''))
csv_writer.writerow(row_data)
parent_elem.clear()
print(f"Successfully extracted data to '{csv_file_path}'")
return True
except FileNotFoundError:
print(f"Error: Output CSV file path '{csv_file_path}' cannot be created.")
return False
except etree.XMLSyntaxError as e:
print(f"Error: XML syntax error in '{xml_file_path}': {e}")
return False
except Exception as e:
print(f"An unexpected error occurred: {e}")
return False
if __name__ == "__main__":
dummy_xml_file_text_ns = "data_text_ns.xml"
num_records = 50000
xml_content_text_ns = """<?xml version="1.0" encoding="UTF-8"?>
<root xmlns:ns="http://example.com/ns" xmlns:other="http://example.com/other">
"""
for i in range(num_records // 2):
xml_content_text_ns += f"""
<ns:person id='{i+1}'>
<name>Person {i}</name>
<city ns:type="home">NewYork {i % 5}</city>
<school>HighSchool {i % 3}</school>
<ns:gender>Male</ns:gender>
<description>This is a person record number {i}.</description>
</ns:person>
"""
for i in range(num_records // 2):
xml_content_text_ns += f"""
<other:organization org_id='{i+100000}'>
<name>Org {i}</name>
<city>London {i % 4}</city>
<other:sector>IT Solutions</other:sector>
<notes>Notes for organization {i}.</notes>
</other:organization>
"""
xml_content_text_ns += "</root>"
with open(dummy_xml_file_text_ns, 'w', encoding='utf-8') as f:
f.write(xml_content_text_ns)
print(f"Dummy XML file with element text and namespaces generated: {dummy_xml_file_text_ns}")
print("\n--- Case 1: Extracting text from 'person' elements ---")
output_csv_text_1 = "persons_text_data.csv"
extract_xml_element_text_to_csv_streaming(
xml_file_path=dummy_xml_file_text_ns,
csv_file_path=output_csv_text_1,
tag_names='person',
element_names_to_extract=['name', 'city', 'school', 'gender', 'description']
)
print("\n--- Case 2: Extracting text from 'organization' elements ---")
output_csv_text_2 = "organizations_text_data.csv"
extract_xml_element_text_to_csv_streaming(
xml_file_path=dummy_xml_file_text_ns,
csv_file_path=output_csv_text_2,
tag_names='organization',
element_names_to_extract=['name', 'city', 'sector', 'notes']
)
print("\n--- Case 3: Extracting text from 'person' and 'organization' (combined) ---")
output_csv_text_3 = "combined_text_data.csv"
extract_xml_element_text_to_csv_streaming(
xml_file_path=dummy_xml_file_text_ns,
csv_file_path=output_csv_text_3,
tag_names=['person', 'organization'],
element_names_to_extract=['name', 'city', 'school', 'gender', 'description', 'sector', 'notes']
)
print("\nCleaning up dummy files...")
os.remove(dummy_xml_file_text_ns)
if os.path.exists(output_csv_text_1): os.remove(output_csv_text_1)
if os.path.exists(output_csv_text_2): os.remove(output_csv_text_2)
if os.path.exists(output_csv_text_3): os.remove(output_csv_text_3)
print("Cleanup complete.")