You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					314 lines
				
				9.2 KiB
			
		
		
			
		
	
	
					314 lines
				
				9.2 KiB
			| 
								 
											3 years ago
										 
									 | 
							
								#
							 | 
						||
| 
								 | 
							
								# This file is part of pyasn1 software.
							 | 
						||
| 
								 | 
							
								#
							 | 
						||
| 
								 | 
							
								# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com>
							 | 
						||
| 
								 | 
							
								# License: http://snmplabs.com/pyasn1/license.html
							 | 
						||
| 
								 | 
							
								#
							 | 
						||
| 
								 | 
							
								from pyasn1 import error
							 | 
						||
| 
								 | 
							
								from pyasn1.codec.ber import encoder
							 | 
						||
| 
								 | 
							
								from pyasn1.compat.octets import str2octs, null
							 | 
						||
| 
								 | 
							
								from pyasn1.type import univ
							 | 
						||
| 
								 | 
							
								from pyasn1.type import useful
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								__all__ = ['encode']
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class BooleanEncoder(encoder.IntegerEncoder):
							 | 
						||
| 
								 | 
							
								    def encodeValue(self, value, asn1Spec, encodeFun, **options):
							 | 
						||
| 
								 | 
							
								        if value == 0:
							 | 
						||
| 
								 | 
							
								            substrate = (0,)
							 | 
						||
| 
								 | 
							
								        else:
							 | 
						||
| 
								 | 
							
								            substrate = (255,)
							 | 
						||
| 
								 | 
							
								        return substrate, False, False
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class RealEncoder(encoder.RealEncoder):
							 | 
						||
| 
								 | 
							
								    def _chooseEncBase(self, value):
							 | 
						||
| 
								 | 
							
								        m, b, e = value
							 | 
						||
| 
								 | 
							
								        return self._dropFloatingPoint(m, b, e)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								# specialized GeneralStringEncoder here
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class TimeEncoderMixIn(object):
							 | 
						||
| 
								 | 
							
								    Z_CHAR = ord('Z')
							 | 
						||
| 
								 | 
							
								    PLUS_CHAR = ord('+')
							 | 
						||
| 
								 | 
							
								    MINUS_CHAR = ord('-')
							 | 
						||
| 
								 | 
							
								    COMMA_CHAR = ord(',')
							 | 
						||
| 
								 | 
							
								    DOT_CHAR = ord('.')
							 | 
						||
| 
								 | 
							
								    ZERO_CHAR = ord('0')
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    MIN_LENGTH = 12
							 | 
						||
| 
								 | 
							
								    MAX_LENGTH = 19
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def encodeValue(self, value, asn1Spec, encodeFun, **options):
							 | 
						||
| 
								 | 
							
								        # CER encoding constraints:
							 | 
						||
| 
								 | 
							
								        # - minutes are mandatory, seconds are optional
							 | 
						||
| 
								 | 
							
								        # - sub-seconds must NOT be zero / no meaningless zeros
							 | 
						||
| 
								 | 
							
								        # - no hanging fraction dot
							 | 
						||
| 
								 | 
							
								        # - time in UTC (Z)
							 | 
						||
| 
								 | 
							
								        # - only dot is allowed for fractions
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if asn1Spec is not None:
							 | 
						||
| 
								 | 
							
								            value = asn1Spec.clone(value)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        numbers = value.asNumbers()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if self.PLUS_CHAR in numbers or self.MINUS_CHAR in numbers:
							 | 
						||
| 
								 | 
							
								            raise error.PyAsn1Error('Must be UTC time: %r' % value)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if numbers[-1] != self.Z_CHAR:
							 | 
						||
| 
								 | 
							
								            raise error.PyAsn1Error('Missing "Z" time zone specifier: %r' % value)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if self.COMMA_CHAR in numbers:
							 | 
						||
| 
								 | 
							
								            raise error.PyAsn1Error('Comma in fractions disallowed: %r' % value)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if self.DOT_CHAR in numbers:
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            isModified = False
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            numbers = list(numbers)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            searchIndex = min(numbers.index(self.DOT_CHAR) + 4, len(numbers) - 1)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            while numbers[searchIndex] != self.DOT_CHAR:
							 | 
						||
| 
								 | 
							
								                if numbers[searchIndex] == self.ZERO_CHAR:
							 | 
						||
| 
								 | 
							
								                    del numbers[searchIndex]
							 | 
						||
| 
								 | 
							
								                    isModified = True
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                searchIndex -= 1
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            searchIndex += 1
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            if searchIndex < len(numbers):
							 | 
						||
| 
								 | 
							
								                if numbers[searchIndex] == self.Z_CHAR:
							 | 
						||
| 
								 | 
							
								                    # drop hanging comma
							 | 
						||
| 
								 | 
							
								                    del numbers[searchIndex - 1]
							 | 
						||
| 
								 | 
							
								                    isModified = True
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            if isModified:
							 | 
						||
| 
								 | 
							
								                value = value.clone(numbers)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if not self.MIN_LENGTH < len(numbers) < self.MAX_LENGTH:
							 | 
						||
| 
								 | 
							
								            raise error.PyAsn1Error('Length constraint violated: %r' % value)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        options.update(maxChunkSize=1000)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        return encoder.OctetStringEncoder.encodeValue(
							 | 
						||
| 
								 | 
							
								            self, value, asn1Spec, encodeFun, **options
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class GeneralizedTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder):
							 | 
						||
| 
								 | 
							
								    MIN_LENGTH = 12
							 | 
						||
| 
								 | 
							
								    MAX_LENGTH = 20
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class UTCTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder):
							 | 
						||
| 
								 | 
							
								    MIN_LENGTH = 10
							 | 
						||
| 
								 | 
							
								    MAX_LENGTH = 14
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class SetOfEncoder(encoder.SequenceOfEncoder):
							 | 
						||
| 
								 | 
							
								    def encodeValue(self, value, asn1Spec, encodeFun, **options):
							 | 
						||
| 
								 | 
							
								        chunks = self._encodeComponents(
							 | 
						||
| 
								 | 
							
								            value, asn1Spec, encodeFun, **options)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        # sort by serialised and padded components
							 | 
						||
| 
								 | 
							
								        if len(chunks) > 1:
							 | 
						||
| 
								 | 
							
								            zero = str2octs('\x00')
							 | 
						||
| 
								 | 
							
								            maxLen = max(map(len, chunks))
							 | 
						||
| 
								 | 
							
								            paddedChunks = [
							 | 
						||
| 
								 | 
							
								                (x.ljust(maxLen, zero), x) for x in chunks
							 | 
						||
| 
								 | 
							
								            ]
							 | 
						||
| 
								 | 
							
								            paddedChunks.sort(key=lambda x: x[0])
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            chunks = [x[1] for x in paddedChunks]
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        return null.join(chunks), True, True
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class SequenceOfEncoder(encoder.SequenceOfEncoder):
							 | 
						||
| 
								 | 
							
								    def encodeValue(self, value, asn1Spec, encodeFun, **options):
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if options.get('ifNotEmpty', False) and not len(value):
							 | 
						||
| 
								 | 
							
								            return null, True, True
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        chunks = self._encodeComponents(
							 | 
						||
| 
								 | 
							
								            value, asn1Spec, encodeFun, **options)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        return null.join(chunks), True, True
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class SetEncoder(encoder.SequenceEncoder):
							 | 
						||
| 
								 | 
							
								    @staticmethod
							 | 
						||
| 
								 | 
							
								    def _componentSortKey(componentAndType):
							 | 
						||
| 
								 | 
							
								        """Sort SET components by tag
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        Sort regardless of the Choice value (static sort)
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								        component, asn1Spec = componentAndType
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if asn1Spec is None:
							 | 
						||
| 
								 | 
							
								            asn1Spec = component
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if asn1Spec.typeId == univ.Choice.typeId and not asn1Spec.tagSet:
							 | 
						||
| 
								 | 
							
								            if asn1Spec.tagSet:
							 | 
						||
| 
								 | 
							
								                return asn1Spec.tagSet
							 | 
						||
| 
								 | 
							
								            else:
							 | 
						||
| 
								 | 
							
								                return asn1Spec.componentType.minTagSet
							 | 
						||
| 
								 | 
							
								        else:
							 | 
						||
| 
								 | 
							
								            return asn1Spec.tagSet
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def encodeValue(self, value, asn1Spec, encodeFun, **options):
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        substrate = null
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        comps = []
							 | 
						||
| 
								 | 
							
								        compsMap = {}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if asn1Spec is None:
							 | 
						||
| 
								 | 
							
								            # instance of ASN.1 schema
							 | 
						||
| 
								 | 
							
								            inconsistency = value.isInconsistent
							 | 
						||
| 
								 | 
							
								            if inconsistency:
							 | 
						||
| 
								 | 
							
								                raise inconsistency
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            namedTypes = value.componentType
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            for idx, component in enumerate(value.values()):
							 | 
						||
| 
								 | 
							
								                if namedTypes:
							 | 
						||
| 
								 | 
							
								                    namedType = namedTypes[idx]
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                    if namedType.isOptional and not component.isValue:
							 | 
						||
| 
								 | 
							
								                            continue
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                    if namedType.isDefaulted and component == namedType.asn1Object:
							 | 
						||
| 
								 | 
							
								                            continue
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                    compsMap[id(component)] = namedType
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                else:
							 | 
						||
| 
								 | 
							
								                    compsMap[id(component)] = None
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                comps.append((component, asn1Spec))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        else:
							 | 
						||
| 
								 | 
							
								            # bare Python value + ASN.1 schema
							 | 
						||
| 
								 | 
							
								            for idx, namedType in enumerate(asn1Spec.componentType.namedTypes):
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                try:
							 | 
						||
| 
								 | 
							
								                    component = value[namedType.name]
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                except KeyError:
							 | 
						||
| 
								 | 
							
								                    raise error.PyAsn1Error('Component name "%s" not found in %r' % (namedType.name, value))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                if namedType.isOptional and namedType.name not in value:
							 | 
						||
| 
								 | 
							
								                    continue
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                if namedType.isDefaulted and component == namedType.asn1Object:
							 | 
						||
| 
								 | 
							
								                    continue
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                compsMap[id(component)] = namedType
							 | 
						||
| 
								 | 
							
								                comps.append((component, asn1Spec[idx]))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        for comp, compType in sorted(comps, key=self._componentSortKey):
							 | 
						||
| 
								 | 
							
								            namedType = compsMap[id(comp)]
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            if namedType:
							 | 
						||
| 
								 | 
							
								                options.update(ifNotEmpty=namedType.isOptional)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            chunk = encodeFun(comp, compType, **options)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            # wrap open type blob if needed
							 | 
						||
| 
								 | 
							
								            if namedType and namedType.openType:
							 | 
						||
| 
								 | 
							
								                wrapType = namedType.asn1Object
							 | 
						||
| 
								 | 
							
								                if wrapType.tagSet and not wrapType.isSameTypeWith(comp):
							 | 
						||
| 
								 | 
							
								                    chunk = encodeFun(chunk, wrapType, **options)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            substrate += chunk
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        return substrate, True, True
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class SequenceEncoder(encoder.SequenceEncoder):
							 | 
						||
| 
								 | 
							
								    omitEmptyOptionals = True
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								tagMap = encoder.tagMap.copy()
							 | 
						||
| 
								 | 
							
								tagMap.update({
							 | 
						||
| 
								 | 
							
								    univ.Boolean.tagSet: BooleanEncoder(),
							 | 
						||
| 
								 | 
							
								    univ.Real.tagSet: RealEncoder(),
							 | 
						||
| 
								 | 
							
								    useful.GeneralizedTime.tagSet: GeneralizedTimeEncoder(),
							 | 
						||
| 
								 | 
							
								    useful.UTCTime.tagSet: UTCTimeEncoder(),
							 | 
						||
| 
								 | 
							
								    # Sequence & Set have same tags as SequenceOf & SetOf
							 | 
						||
| 
								 | 
							
								    univ.SetOf.tagSet: SetOfEncoder(),
							 | 
						||
| 
								 | 
							
								    univ.Sequence.typeId: SequenceEncoder()
							 | 
						||
| 
								 | 
							
								})
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								typeMap = encoder.typeMap.copy()
							 | 
						||
| 
								 | 
							
								typeMap.update({
							 | 
						||
| 
								 | 
							
								    univ.Boolean.typeId: BooleanEncoder(),
							 | 
						||
| 
								 | 
							
								    univ.Real.typeId: RealEncoder(),
							 | 
						||
| 
								 | 
							
								    useful.GeneralizedTime.typeId: GeneralizedTimeEncoder(),
							 | 
						||
| 
								 | 
							
								    useful.UTCTime.typeId: UTCTimeEncoder(),
							 | 
						||
| 
								 | 
							
								    # Sequence & Set have same tags as SequenceOf & SetOf
							 | 
						||
| 
								 | 
							
								    univ.Set.typeId: SetEncoder(),
							 | 
						||
| 
								 | 
							
								    univ.SetOf.typeId: SetOfEncoder(),
							 | 
						||
| 
								 | 
							
								    univ.Sequence.typeId: SequenceEncoder(),
							 | 
						||
| 
								 | 
							
								    univ.SequenceOf.typeId: SequenceOfEncoder()
							 | 
						||
| 
								 | 
							
								})
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class Encoder(encoder.Encoder):
							 | 
						||
| 
								 | 
							
								    fixedDefLengthMode = False
							 | 
						||
| 
								 | 
							
								    fixedChunkSize = 1000
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								#: Turns ASN.1 object into CER octet stream.
							 | 
						||
| 
								 | 
							
								#:
							 | 
						||
| 
								 | 
							
								#: Takes any ASN.1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
							 | 
						||
| 
								 | 
							
								#: walks all its components recursively and produces a CER octet stream.
							 | 
						||
| 
								 | 
							
								#:
							 | 
						||
| 
								 | 
							
								#: Parameters
							 | 
						||
| 
								 | 
							
								#: ----------
							 | 
						||
| 
								 | 
							
								#: value: either a Python or pyasn1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
							 | 
						||
| 
								 | 
							
								#:     A Python or pyasn1 object to encode. If Python object is given, `asnSpec`
							 | 
						||
| 
								 | 
							
								#:     parameter is required to guide the encoding process.
							 | 
						||
| 
								 | 
							
								#:
							 | 
						||
| 
								 | 
							
								#: Keyword Args
							 | 
						||
| 
								 | 
							
								#: ------------
							 | 
						||
| 
								 | 
							
								#: asn1Spec:
							 | 
						||
| 
								 | 
							
								#:     Optional ASN.1 schema or value object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative
							 | 
						||
| 
								 | 
							
								#:
							 | 
						||
| 
								 | 
							
								#: Returns
							 | 
						||
| 
								 | 
							
								#: -------
							 | 
						||
| 
								 | 
							
								#: : :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2)
							 | 
						||
| 
								 | 
							
								#:     Given ASN.1 object encoded into BER octet-stream
							 | 
						||
| 
								 | 
							
								#:
							 | 
						||
| 
								 | 
							
								#: Raises
							 | 
						||
| 
								 | 
							
								#: ------
							 | 
						||
| 
								 | 
							
								#: ~pyasn1.error.PyAsn1Error
							 | 
						||
| 
								 | 
							
								#:     On encoding errors
							 | 
						||
| 
								 | 
							
								#:
							 | 
						||
| 
								 | 
							
								#: Examples
							 | 
						||
| 
								 | 
							
								#: --------
							 | 
						||
| 
								 | 
							
								#: Encode Python value into CER with ASN.1 schema
							 | 
						||
| 
								 | 
							
								#:
							 | 
						||
| 
								 | 
							
								#: .. code-block:: pycon
							 | 
						||
| 
								 | 
							
								#:
							 | 
						||
| 
								 | 
							
								#:    >>> seq = SequenceOf(componentType=Integer())
							 | 
						||
| 
								 | 
							
								#:    >>> encode([1, 2, 3], asn1Spec=seq)
							 | 
						||
| 
								 | 
							
								#:    b'0\x80\x02\x01\x01\x02\x01\x02\x02\x01\x03\x00\x00'
							 | 
						||
| 
								 | 
							
								#:
							 | 
						||
| 
								 | 
							
								#: Encode ASN.1 value object into CER
							 | 
						||
| 
								 | 
							
								#:
							 | 
						||
| 
								 | 
							
								#: .. code-block:: pycon
							 | 
						||
| 
								 | 
							
								#:
							 | 
						||
| 
								 | 
							
								#:    >>> seq = SequenceOf(componentType=Integer())
							 | 
						||
| 
								 | 
							
								#:    >>> seq.extend([1, 2, 3])
							 | 
						||
| 
								 | 
							
								#:    >>> encode(seq)
							 | 
						||
| 
								 | 
							
								#:    b'0\x80\x02\x01\x01\x02\x01\x02\x02\x01\x03\x00\x00'
							 | 
						||
| 
								 | 
							
								#:
							 | 
						||
| 
								 | 
							
								encode = Encoder(tagMap, typeMap)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								# EncoderFactory queries class instance and builds a map of tags -> encoders
							 |