"""The lower-level records API."""
import re
from abc import ABC, abstractmethod
from typing import (
TYPE_CHECKING,
ClassVar,
Iterable,
List,
Optional,
Pattern,
Tuple,
Type,
TypeVar,
Union,
cast,
)
from attr.validators import optional
from attrs import define, field
from netsgiro import RecordType, ServiceCode
from netsgiro.converters import (
fixed_len_str,
stripped_newlines,
to_assignment_type,
to_avtalegiro_registration_type,
to_bool,
to_date,
to_date_or_none,
to_int_or_none,
to_record_type,
to_safe_str_or_none,
to_service_code,
to_transaction_type,
)
from netsgiro.validators import str_of_length, str_of_max_length
if TYPE_CHECKING:
import datetime
from netsgiro.enums import AssignmentType, AvtaleGiroRegistrationType, TransactionType
__all__: List[str] = [
'TransmissionStart',
'TransmissionEnd',
'AssignmentStart',
'AssignmentEnd',
'TransactionAmountItem1',
'TransactionAmountItem2',
'TransactionAmountItem3',
'TransactionSpecification',
'AvtaleGiroAgreement',
'parse',
]
R = TypeVar('R', bound='Record')
@define
class Record(ABC):
"""Record base class."""
_PATTERNS: ClassVar[List[Pattern]]
RECORD_TYPE: ClassVar[RecordType]
service_code: ServiceCode = field(converter=to_service_code)
@classmethod
def from_string(cls: Type[R], line: str) -> R:
"""Parse OCR string into a record object."""
for pattern in cls._PATTERNS:
matches = pattern.match(line)
if matches is not None:
return cls(**matches.groupdict())
raise ValueError(f'{line!r} did not match {cls.__name__} record formats')
@abstractmethod
def to_ocr(self) -> str:
"""Get record as OCR string."""
[docs]@define
class TransmissionStart(Record):
"""TransmissionStart is the first record in every OCR file.
A file can only contain a single transmission.
Each transmission can contain any number of assignments.
"""
transmission_number: str = field(validator=str_of_length(7))
data_transmitter: str = field(validator=str_of_length(8))
data_recipient: str = field(validator=str_of_length(8))
RECORD_TYPE: ClassVar[RecordType] = RecordType.TRANSMISSION_START
_PATTERNS: ClassVar[List[Pattern]] = [
re.compile(
r'''
^
NY # Format code
(?P<service_code>00)
00 # Transmission type, always 00
10 # Record type
(?P<data_transmitter>\d{8})
(?P<transmission_number>\d{7})
(?P<data_recipient>\d{8})
0{49} # Padding
$
''',
re.VERBOSE,
)
]
[docs] def to_ocr(self) -> str:
"""Get record as OCR string."""
return (
f'NY000010{self.data_transmitter:8}{self.transmission_number:7}{self.data_recipient:8}'
+ ('0' * 49)
)
[docs]@define
class TransmissionEnd(Record):
"""TransmissionEnd is the first record in every OCR file."""
num_transactions: int = field(converter=int)
num_records: int = field(converter=int)
total_amount: int = field(converter=int)
nets_date: 'datetime.date' = field(converter=to_date_or_none)
RECORD_TYPE: ClassVar[RecordType] = RecordType.TRANSMISSION_END
_PATTERNS: ClassVar[List[Pattern]] = [
re.compile(
r'''
^
NY # Format code
(?P<service_code>00)
00 # Transmission type, always 00
89 # Record type
(?P<num_transactions>\d{8})
(?P<num_records>\d{8})
(?P<total_amount>\d{17})
(?P<nets_date>\d{6})
0{33} # Filler
$
''',
re.VERBOSE,
)
]
[docs] def to_ocr(self) -> str:
"""Get record as OCR string."""
return (
'NY000089'
f'{self.num_transactions:08d}'
f'{self.num_records:08d}'
f'{self.total_amount:017d}'
f'{self.nets_date:%d%m%y}'
+ ('0' * 33)
)
[docs]@define
class AssignmentStart(Record):
"""AssignmentStart is the first record of an assignment.
Each assignment can contain any number of transactions.
"""
assignment_type: 'AssignmentType' = field(converter=to_assignment_type)
assignment_number: str = field(validator=str_of_length(7))
assignment_account: str = field(validator=str_of_length(11))
# Only for assignment_type == AssignmentType.TRANSACTIONS
agreement_id: Optional[str] = field(default=None, validator=optional(str_of_length(9)))
RECORD_TYPE: ClassVar[RecordType] = RecordType.ASSIGNMENT_START
_PATTERNS: ClassVar[List[Pattern]] = [
re.compile(
r'''
^
NY # Format code
(?P<service_code>(09|21))
(?P<assignment_type>00)
20 # Record type
(?P<agreement_id>\d{9})
(?P<assignment_number>\d{7})
(?P<assignment_account>\d{11})
0{45} # Filler
$
''',
re.VERBOSE,
),
re.compile(
r'''
^
NY # Format code
(?P<service_code>21)
(?P<assignment_type>24)
20 # Record type
0{9} # Filler
(?P<assignment_number>\d{7})
(?P<assignment_account>\d{11})
0{45} # Filler
$
''',
re.VERBOSE,
),
re.compile(
r'''
^
NY # Format code
(?P<service_code>21)
(?P<assignment_type>36)
20 # Record type
0{9} # Filler
(?P<assignment_number>\d{7})
(?P<assignment_account>\d{11})
0{45} # Filler
$
''',
re.VERBOSE,
),
]
[docs] def to_ocr(self) -> str:
"""Get record as OCR string."""
return (
f'NY{self.service_code:02d}{self.assignment_type:02d}20'
+ (self.agreement_id and f'{self.agreement_id:9}' or ('0' * 9))
+ f'{self.assignment_number:7}{self.assignment_account:11}'
+ ('0' * 45)
)
[docs]@define
class AssignmentEnd(Record):
"""AssignmentEnd is the last record of an assignment."""
assignment_type: 'AssignmentType' = field(converter=to_assignment_type)
num_transactions: int = field(converter=int)
num_records: int = field(converter=int)
# Only for transactions and cancellations
total_amount: Optional[int] = field(default=None, converter=to_int_or_none)
nets_date_1: Optional['datetime.date'] = field(default=None, converter=to_date_or_none)
nets_date_2: Optional['datetime.date'] = field(default=None, converter=to_date_or_none)
nets_date_3: Optional['datetime.date'] = field(default=None, converter=to_date_or_none)
RECORD_TYPE: ClassVar[RecordType] = RecordType.ASSIGNMENT_END
_PATTERNS: ClassVar[List[Pattern]] = [
re.compile(
r'''
^
NY # Format code
(?P<service_code>(09|21))
(?P<assignment_type>00) # Transactions / payment requests
88 # Record type
(?P<num_transactions>\d{8})
(?P<num_records>\d{8})
(?P<total_amount>\d{17})
(?P<nets_date_1>\d{6})
(?P<nets_date_2>\d{6})
(?P<nets_date_3>\d{6})
0{21} # Filler
$
''',
re.VERBOSE,
),
re.compile(
r'''
^
NY # Format code
(?P<service_code>21)
(?P<assignment_type>24) # AvtaleGiro agreements
88 # Record type
(?P<num_transactions>\d{8})
(?P<num_records>\d{8})
0{56} # Filler
$
''',
re.VERBOSE,
),
re.compile(
r'''
^
NY # Format code
(?P<service_code>21)
(?P<assignment_type>36) # AvtaleGiro cancellations
88 # Record type
(?P<num_transactions>\d{8})
(?P<num_records>\d{8})
(?P<total_amount>\d{17})
(?P<nets_date_1>\d{6})
(?P<nets_date_2>\d{6})
0{27} # Filler
$
''',
re.VERBOSE,
),
]
@property
def nets_date(self) -> Optional['datetime.date']:
"""Nets' processing date.
Only used for OCR Giro.
"""
return self.nets_date_1 if self.service_code == ServiceCode.OCR_GIRO else None
@property
def nets_date_earliest(self) -> Optional['datetime.date']:
"""Earliest date from the contained transactions."""
if self.service_code == ServiceCode.OCR_GIRO:
return self.nets_date_2
elif self.service_code == ServiceCode.AVTALEGIRO:
return self.nets_date_1
else: # pragma: no cover
raise ValueError(f'Unhandled service code: {self.service_code}')
@property
def nets_date_latest(self) -> Optional['datetime.date']:
"""Latest date from the contained transactions."""
if self.service_code == ServiceCode.OCR_GIRO:
return self.nets_date_3
elif self.service_code == ServiceCode.AVTALEGIRO:
return self.nets_date_2
else: # pragma: no cover
raise ValueError(f'Unhandled service code: {self.service_code}')
[docs] def to_ocr(self) -> str:
"""Get record as OCR string."""
return (
'NY'
f'{self.service_code:02d}'
f'{self.assignment_type:02d}'
'88'
f'{self.num_transactions:08d}'
f'{self.num_records:08d}'
+ (self.total_amount and f'{self.total_amount:017d}' or ('0' * 17))
+ (self.nets_date_1 and f'{self.nets_date_1:%d%m%y}' or ('0' * 6))
+ (self.nets_date_2 and f'{self.nets_date_2:%d%m%y}' or ('0' * 6))
+ (self.nets_date_3 and f'{self.nets_date_3:%d%m%y}' or ('0' * 6))
+ ('0' * 21)
)
@define
class TransactionRecord(Record):
"""Transaction record base class."""
transaction_type: 'TransactionType' = field(converter=to_transaction_type)
transaction_number: int = field(converter=int)
[docs]@define
class TransactionAmountItem1(TransactionRecord):
"""TransactionAmountItem1 is the first record of a transaction.
The record is used both for AvtaleGiro and for OCR Giro.
"""
nets_date: 'datetime.date' = field(converter=to_date)
amount: int = field(converter=int)
kid: Optional[str] = field(
converter=to_safe_str_or_none, validator=optional(str_of_max_length(25))
)
# Only OCR Giro
centre_id: Optional[str] = field(default=None, validator=optional(str_of_length(2)))
day_code: Optional[int] = field(default=None, converter=to_int_or_none)
partial_settlement_number: Optional[int] = field(default=None, converter=to_int_or_none)
partial_settlement_serial_number: Optional[str] = field(
default=None, validator=optional(str_of_length(5))
)
sign: Optional[str] = field(default=None, validator=optional(str_of_length(1)))
RECORD_TYPE: ClassVar[RecordType] = RecordType.TRANSACTION_AMOUNT_ITEM_1
_PATTERNS: ClassVar[List[Pattern]] = [
re.compile(
r'''
^
NY # Format code
(?P<service_code>09)
(?P<transaction_type>\d{2}) # 10-21
30 # Record type
(?P<transaction_number>\d{7})
(?P<nets_date>\d{6})
(?P<centre_id>\d{2})
(?P<day_code>\d{2})
(?P<partial_settlement_number>\d{1})
(?P<partial_settlement_serial_number>\d{5})
(?P<sign>[-0]{1})
(?P<amount>\d{17})
(?P<kid>[\d ]{25})
0{6} # Filler
$
''',
re.VERBOSE,
),
re.compile(
r'''
^
NY # Format code
(?P<service_code>21)
(?P<transaction_type>\d{2}) # 02, 21, or 93
30 # Record type
(?P<transaction_number>\d{7})
(?P<nets_date>\d{6})
[ ]{11} # Filler
(?P<amount>\d{17})
(?P<kid>[\d ]{25})
0{6} # Filler
$
''',
re.VERBOSE,
),
]
[docs] def to_ocr(self) -> str:
"""Get record as OCR string."""
if self.service_code == ServiceCode.OCR_GIRO:
ocr_giro_fields = (
f'{self.centre_id:2}'
f'{self.day_code:02d}'
f'{self.partial_settlement_number:01d}'
f'{self.partial_settlement_serial_number:5}'
f'{self.sign:1}'
)
else:
ocr_giro_fields = ' ' * 11
return (
'NY'
f'{self.service_code:02d}'
f'{self.transaction_type:02d}'
'30'
f'{self.transaction_number:07d}'
f'{self.nets_date:%d%m%y}'
+ ocr_giro_fields
+ f'{self.amount:017d}{self.kid:>25}'
+ ('0' * 6)
)
[docs]@define
class TransactionAmountItem2(TransactionRecord):
"""TransactionAmountItem2 is the second record of a transaction.
The record is used both for AvtaleGiro and for OCR Giro.
"""
# TODO Validate `reference` length, which depends on service code
reference: Optional[str] = field(converter=to_safe_str_or_none)
# Only OCR Giro
form_number: Optional[str] = field(default=None, validator=optional(str_of_length(10)))
bank_date: Optional['datetime.date'] = field(default=None, converter=to_date_or_none)
debit_account: Optional[str] = field(default=None, validator=optional(str_of_length(11)))
# XXX In use in OCR Giro "from giro debited account" transactions in test
# data, but documented as a filler field.
_filler: Optional[str] = field(default=None)
# Only AvtaleGiro
payer_name: Optional[str] = field(default=None, converter=to_safe_str_or_none)
RECORD_TYPE: ClassVar[RecordType] = RecordType.TRANSACTION_AMOUNT_ITEM_2
_PATTERNS: ClassVar[List[Pattern]] = [
re.compile(
r'''
^
NY # Format code
(?P<service_code>09)
(?P<transaction_type>\d{2}) # 10-21
31 # Record type
(?P<transaction_number>\d{7})
(?P<form_number>\d{10})
(?P<reference>\d{9})
(?P<filler>.{7}) # XXX Documented as filler, in use in test data
(?P<bank_date>\d{6})
(?P<debit_account>\d{11})
0{22} # Filler
$
''',
re.VERBOSE,
),
re.compile(
r'''
^
NY # Format code
(?P<service_code>21)
(?P<transaction_type>\d{2}) # 02, 21, or 93
31 # Record type
(?P<transaction_number>\d{7})
(?P<payer_name>.{10})
[ ]{25} # Filler
(?P<reference>.{25})
0{5} # Filler
$
''',
re.VERBOSE,
),
]
[docs] def to_ocr(self) -> str:
"""Get record as OCR string."""
common_fields = (
f'NY{self.service_code:02d}{self.transaction_type:02d}31{self.transaction_number:07d}'
)
if self.service_code == ServiceCode.OCR_GIRO:
service_fields = (
f'{self.form_number:10}'
+ (self.reference and f'{self.reference:9}' or (' ' * 9))
+ (self._filler and f'{self._filler:7}' or ('0' * 7))
+ (self.bank_date and f'{self.bank_date:%d%m%y}' or '0' * 6)
+ f'{self.debit_account:11}'
+ ('0' * 22)
)
elif self.service_code == ServiceCode.AVTALEGIRO:
service_fields = (
(self.payer_name and f'{self.payer_name[:10]:10}' or (' ' * 10))
+ (' ' * 25)
+ (self.reference and f'{self.reference:25}' or (' ' * 25))
+ ('0' * 5)
)
else: # pragma: no cover
service_fields = ' ' * 35
return common_fields + service_fields
[docs]@define
class TransactionAmountItem3(TransactionRecord):
"""TransactionAmountItem3 is the third record of a transaction.
The record is only used for some OCR Giro transaction types.
"""
text: Optional[str] = field(
converter=to_safe_str_or_none, validator=optional(str_of_max_length(40))
)
RECORD_TYPE: ClassVar[RecordType] = RecordType.TRANSACTION_AMOUNT_ITEM_3
_PATTERNS: ClassVar[List[Pattern]] = [
re.compile(
r'''
^
NY # Format code
(?P<service_code>09)
(?P<transaction_type>\d{2}) # 20-21
32 # Record type
(?P<transaction_number>\d{7})
(?P<text>.{40})
0{25} # Filler
$
''',
re.VERBOSE,
)
]
[docs] def to_ocr(self) -> str:
"""Get record as OCR string."""
return (
f'NY09{self.transaction_type:02d}32{self.transaction_number:07d}'
+ (self.text and f'{self.text:40}' or (' ' * 40))
+ ('0' * 25)
)
[docs]@define
class TransactionSpecification(TransactionRecord):
"""TransactionSpecification is used for AvtaleGiro transactions.
The record is only used when bank notification is used to notify the payer.
Each record contains half of an 80 char long line of text and can be
repeated up to 84 times for a single transaction for a total of 42 lines of
specification text.
"""
line_number: int = field(converter=int)
column_number: int = field(converter=int)
text = field(
converter=stripped_newlines(fixed_len_str(40, str)), # type: ignore[misc]
validator=optional(str_of_max_length(40)),
)
RECORD_TYPE: ClassVar[RecordType] = RecordType.TRANSACTION_SPECIFICATION
_PATTERNS: ClassVar[List[Pattern]] = [
re.compile(
r'''
^
NY # Format code
(?P<service_code>21)
(?P<transaction_type>21)
49 # Record type
(?P<transaction_number>\d{7})
4 # Payment notification
(?P<line_number>\d{3})
(?P<column_number>\d{1})
(?P<text>.{40})
0{20} # Filler
$
''',
re.VERBOSE,
)
]
_MAX_LINES = 42
_MAX_LINE_LENGTH = 80
_MAX_COLUMNS = 2
_MAX_RECORDS = _MAX_LINES * _MAX_COLUMNS
[docs] @classmethod
def from_text(
cls,
*,
service_code: ServiceCode,
transaction_type: 'TransactionType',
transaction_number: int,
text: Optional[str],
) -> Iterable['TransactionSpecification']:
"""Create a sequence of specification records from a text string."""
for line, column, txt in cls._split_text_to_lines_and_columns(text or ''):
yield cls(
service_code=service_code,
transaction_type=transaction_type,
transaction_number=transaction_number,
line_number=line,
column_number=column,
text=txt,
)
@classmethod
def _split_text_to_lines_and_columns(cls, text: str) -> Iterable[Tuple[int, int, str]]:
lines = text.splitlines()
if len(lines) > cls._MAX_LINES:
raise ValueError(f'Max {cls._MAX_LINES} specification lines allowed, got {len(lines)}')
for line_number, line_text in enumerate(lines, 1):
if len(line_text) > cls._MAX_LINE_LENGTH:
raise ValueError(
'Specification lines must be max {} chars long, got {}: {!r}'.format(
cls._MAX_LINE_LENGTH, len(line_text), line_text
)
)
yield line_number, 1, f'{line_text[:40]:40}'
yield line_number, 2, f'{line_text[40:80]:40}'
[docs] @classmethod
def to_text(cls, records: List['TransactionSpecification']) -> str:
"""Get a text string from a sequence of specification records."""
if len(records) > cls._MAX_RECORDS:
raise ValueError(
f'Max {cls._MAX_RECORDS} specification records allowed, got {len(records)}'
)
tuples = sorted((r.line_number, r.column_number, r) for r in records)
text = ''
for _, column, specification in tuples:
if specification.text:
text += specification.text
if column == cls._MAX_COLUMNS:
text += '\n'
return text
[docs] def to_ocr(self) -> str:
"""Get record as OCR string."""
return (
'NY212149'
f'{self.transaction_number:07d}'
'4'
f'{self.line_number:03d}'
f'{self.column_number:01d}'
f'{self.text:40}'
+ ('0' * 20)
)
[docs]@define
class AvtaleGiroAgreement(TransactionRecord):
"""AvtaleGiroAgreement is used by Nets to notify about agreement changes.
This includes new or deleted agreements, as well as updates to the payer's
notification preferences.
"""
registration_type: 'AvtaleGiroRegistrationType' = field(
converter=to_avtalegiro_registration_type
)
kid: Optional[str] = field(
converter=to_safe_str_or_none, validator=optional(str_of_max_length(25))
)
notify: bool = field(converter=to_bool)
RECORD_TYPE: ClassVar[RecordType] = RecordType.TRANSACTION_AGREEMENTS
_PATTERNS: ClassVar[List[Pattern]] = [
re.compile(
r'''
^
NY # Format code
(?P<service_code>21)
(?P<transaction_type>94)
70 # Record type
(?P<transaction_number>\d{7})
(?P<registration_type>\d{1})
(?P<kid>[\d ]{25})
(?P<notify>[JN]{1})
0{38} # Filler
$
''',
re.VERBOSE,
)
]
[docs] def to_ocr(self) -> str:
"""Get record as OCR string."""
return (
f'NY219470{self.transaction_number:07d}{self.registration_type:01d}{self.kid:>25}'
+ (self.notify and 'J' or 'N')
+ ('0' * 38)
).format(self=self)
[docs]def parse(data: str) -> List[R]:
"""Parse an OCR file into a list of record objects."""
def all_subclasses(cls: Union[Type[R], Type[Record]]) -> List[Type[R]]:
"""Return a list of subclasses for a given class."""
classes = cls.__subclasses__() + [
subsubcls for subcls in cls.__subclasses__() for subsubcls in all_subclasses(subcls)
]
return cast(List[Type[R]], classes)
record_classes = {
cls.RECORD_TYPE: cls for cls in all_subclasses(Record) if hasattr(cls, 'RECORD_TYPE')
}
results: List[R] = []
for line in data.strip().splitlines():
if len(line) != 80:
raise ValueError('All lines must be exactly 80 chars long')
record_type_str = line[6:8]
if not record_type_str.isnumeric():
raise ValueError(f'Record type must be numeric, got {record_type_str!r}')
record_type = to_record_type(record_type_str)
record_cls = record_classes[record_type]
results.append(record_cls.from_string(line))
return results