1 ## This file is part of Scapy
2 ## See http://www.secdev.org/projects/scapy for more informations
3 ## Copyright (C) Arnaud Ebalard <arno@natisbad.org>
4 ## This program is published under a GPLv2 license
6 import os, sys, math, socket, struct, sha, hmac, string, time
7 import random, popen2, tempfile
8 from scapy.utils import strxor
15 from Crypto.PublicKey import *
16 from Crypto.Cipher import *
17 from Crypto.Hash import *
19 # Maximum allowed size in bytes for a certificate file, to avoid
20 # loading huge file when importing a cert
23 MAX_CRL_SIZE=10*1024*1024 # some are that big
25 #####################################################################
27 #####################################################################
30 print "WARNING: %s" % m
34 Returns a random string of length l (l >= 0)
36 tmp = map(lambda x: struct.pack("B", random.randrange(0, 256, 1)), [""]*l)
39 def zerofree_randstring(l):
41 Returns a random string of length l (l >= 0) without zero in it.
43 tmp = map(lambda x: struct.pack("B", random.randrange(1, 256, 1)), [""]*l)
48 Returns the binary AND of the 2 provided strings s1 and s2. s1 and s2
49 must be of same length.
51 return "".join(map(lambda x,y:chr(ord(x)&ord(y)), s1, s2))
53 # OS2IP function defined in RFC 3447 for octet string to integer conversion
56 Accepts a byte string as input parameter and return the associated long
59 Input : x octet string to be converted
61 Output: x corresponding nonnegative integer
63 Reverse function is pkcs_i2osp()
65 return RSA.number.bytes_to_long(x)
67 # IP2OS function defined in RFC 3447 for octet string to integer conversion
68 def pkcs_i2osp(x,xLen):
70 Converts a long (the first parameter) to the associated byte string
71 representation of length l (second parameter). Basically, the length
72 parameters allow the function to perform the associated padding.
74 Input : x nonnegative integer to be converted
75 xLen intended length of the resulting octet string
77 Output: x corresponding nonnegative integer
79 Reverse function is pkcs_os2ip().
81 z = RSA.number.long_to_bytes(x)
82 padlen = max(0, xLen-len(z))
83 return '\x00'*padlen + z
85 # for every hash function a tuple is provided, giving access to
86 # - hash output length in byte
87 # - associated hash function that take data to be hashed as parameter
88 # XXX I do not provide update() at the moment.
89 # - DER encoding of the leading bits of digestInfo (the hash value
90 # will be concatenated to create the complete digestInfo).
93 # - MD4 asn.1 value should be verified. Also, as stated in
94 # PKCS#1 v2.1, MD4 should not be used.
95 # - hashlib is available from http://code.krypto.org/python/hashlib/
96 # - 'tls' one is the concatenation of both md5 and sha1 hashes used
97 # by SSL/TLS when signing/verifying things
100 lambda x: MD2.new(x).digest(),
101 '\x30\x20\x30\x0c\x06\x08\x2a\x86\x48\x86\xf7\x0d\x02\x02\x05\x00\x04\x10'),
103 lambda x: MD4.new(x).digest(),
104 '\x30\x20\x30\x0c\x06\x08\x2a\x86\x48\x86\xf7\x0d\x02\x04\x05\x00\x04\x10'), # is that right ?
106 lambda x: MD5.new(x).digest(),
107 '\x30\x20\x30\x0c\x06\x08\x2a\x86\x48\x86\xf7\x0d\x02\x05\x05\x00\x04\x10'),
109 lambda x: SHA.new(x).digest(),
110 '\x30\x21\x30\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14'),
112 lambda x: MD5.new(x).digest() + SHA.new(x).digest(),
116 _hashFuncParams["sha224"] = (28,
117 lambda x: hashlib.sha224(x).digest(),
118 '\x30\x2d\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x04\x05\x00\x04\x1c')
119 _hashFuncParams["sha256"] = (32,
120 lambda x: hashlib.sha256(x).digest(),
121 '\x30\x31\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x01\x05\x00\x04\x20')
122 _hashFuncParams["sha384"] = (48,
123 lambda x: hashlib.sha384(x).digest(),
124 '\x30\x41\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x02\x05\x00\x04\x30')
125 _hashFuncParams["sha512"] = (64,
126 lambda x: hashlib.sha512(x).digest(),
127 '\x30\x51\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x03\x05\x00\x04\x40')
129 warning("hashlib support is not available. Consider installing it")
130 warning("if you need sha224, sha256, sha384 and sha512 algs.")
132 def pkcs_mgf1(mgfSeed, maskLen, h):
134 Implements generic MGF1 Mask Generation function as described in
135 Appendix B.2.1 of RFC 3447. The hash function is passed by name.
136 valid values are 'md2', 'md4', 'md5', 'sha1', 'tls, 'sha256',
137 'sha384' and 'sha512'. Returns None on error.
140 mgfSeed: seed from which mask is generated, an octet string
141 maskLen: intended length in octets of the mask, at most 2^32 * hLen
143 h : hash function name (in 'md2', 'md4', 'md5', 'sha1', 'tls',
144 'sha256', 'sha384'). hLen denotes the length in octets of
145 the hash function output.
148 an octet string of length maskLen
151 # steps are those of Appendix B.2.1
152 if not _hashFuncParams.has_key(h):
153 warning("pkcs_mgf1: invalid hash (%s) provided")
155 hLen = _hashFuncParams[h][0]
156 hFunc = _hashFuncParams[h][1]
157 if maskLen > 2**32 * hLen: # 1)
158 warning("pkcs_mgf1: maskLen > 2**32 * hLen")
161 maxCounter = math.ceil(float(maskLen) / float(hLen)) # 3)
163 while counter < maxCounter:
164 C = pkcs_i2osp(counter, 4)
165 T += hFunc(mgfSeed + C)
170 def pkcs_emsa_pss_encode(M, emBits, h, mgf, sLen):
172 Implements EMSA-PSS-ENCODE() function described in Sect. 9.1.1 of RFC 3447
175 M : message to be encoded, an octet string
176 emBits: maximal bit length of the integer resulting of pkcs_os2ip(EM),
177 where EM is the encoded message, output of the function.
178 h : hash function name (in 'md2', 'md4', 'md5', 'sha1', 'tls',
179 'sha256', 'sha384'). hLen denotes the length in octets of
180 the hash function output.
181 mgf : the mask generation function f : seed, maskLen -> mask
182 sLen : intended length in octets of the salt
185 encoded message, an octet string of length emLen = ceil(emBits/8)
187 On error, None is returned.
191 hLen = _hashFuncParams[h][0] # 2)
192 hFunc = _hashFuncParams[h][1]
194 emLen = int(math.ceil(emBits/8.))
195 if emLen < hLen + sLen + 2: # 3)
196 warning("encoding error (emLen < hLen + sLen + 2)")
198 salt = randstring(sLen) # 4)
199 MPrime = '\x00'*8 + mHash + salt # 5)
200 H = hFunc(MPrime) # 6)
201 PS = '\x00'*(emLen - sLen - hLen - 2) # 7)
202 DB = PS + '\x01' + salt # 8)
203 dbMask = mgf(H, emLen - hLen - 1) # 9)
204 maskedDB = strxor(DB, dbMask) # 10)
205 l = (8*emLen - emBits)/8 # 11)
206 rem = 8*emLen - emBits - 8*l # additionnal bits
209 j = chr(reduce(lambda x,y: x+y, map(lambda x: 1<<x, range(8-rem))))
212 maskedDB = strand(maskedDB[:l], andMask) + maskedDB[l:]
213 EM = maskedDB + H + '\xbc' # 12)
217 def pkcs_emsa_pss_verify(M, EM, emBits, h, mgf, sLen):
219 Implements EMSA-PSS-VERIFY() function described in Sect. 9.1.2 of RFC 3447
222 M : message to be encoded, an octet string
223 EM : encoded message, an octet string of length emLen = ceil(emBits/8)
224 emBits: maximal bit length of the integer resulting of pkcs_os2ip(EM)
225 h : hash function name (in 'md2', 'md4', 'md5', 'sha1', 'tls',
226 'sha256', 'sha384'). hLen denotes the length in octets of
227 the hash function output.
228 mgf : the mask generation function f : seed, maskLen -> mask
229 sLen : intended length in octets of the salt
232 True if the verification is ok, False otherwise.
236 hLen = _hashFuncParams[h][0] # 2)
237 hFunc = _hashFuncParams[h][1]
239 emLen = int(math.ceil(emBits/8.)) # 3)
240 if emLen < hLen + sLen + 2:
242 if EM[-1] != '\xbc': # 4)
244 l = emLen - hLen - 1 # 5)
247 l = (8*emLen - emBits)/8 # 6)
248 rem = 8*emLen - emBits - 8*l # additionnal bits
251 val = reduce(lambda x,y: x+y, map(lambda x: 1<<x, range(8-rem)))
255 if strand(maskedDB[:l], andMask) != '\x00'*l:
257 dbMask = mgf(H, emLen - hLen - 1) # 7)
258 DB = strxor(maskedDB, dbMask) # 8)
259 l = (8*emLen - emBits)/8 # 9)
260 rem = 8*emLen - emBits - 8*l # additionnal bits
263 j = chr(reduce(lambda x,y: x+y, map(lambda x: 1<<x, range(8-rem))))
266 DB = strand(DB[:l], andMask) + DB[l:]
267 l = emLen - hLen - sLen - 1 # 10)
268 if DB[:l] != '\x00'*(l-1) + '\x01':
270 salt = DB[-sLen:] # 11)
271 MPrime = '\x00'*8 + mHash + salt # 12)
272 HPrime = hFunc(MPrime) # 13)
273 return H == HPrime # 14)
276 def pkcs_emsa_pkcs1_v1_5_encode(M, emLen, h): # section 9.2 of RFC 3447
278 Implements EMSA-PKCS1-V1_5-ENCODE() function described in Sect.
282 M : message to be encode, an octet string
283 emLen: intended length in octets of the encoded message, at least
284 tLen + 11, where tLen is the octet length of the DER encoding
285 T of a certain value computed during the encoding operation.
286 h : hash function name (in 'md2', 'md4', 'md5', 'sha1', 'tls',
287 'sha256', 'sha384'). hLen denotes the length in octets of
288 the hash function output.
291 encoded message, an octet string of length emLen
293 On error, None is returned.
295 hLen = _hashFuncParams[h][0] # 1)
296 hFunc = _hashFuncParams[h][1]
298 hLeadingDigestInfo = _hashFuncParams[h][2] # 2)
299 T = hLeadingDigestInfo + H
301 if emLen < tLen + 11: # 3)
302 warning("pkcs_emsa_pkcs1_v1_5_encode: intended encoded message length too short")
304 PS = '\xff'*(emLen - tLen - 3) # 4)
305 EM = '\x00' + '\x01' + PS + '\x00' + T # 5)
309 # XXX should add other pgf1 instance in a better fashion.
311 def create_ca_file(anchor_list, filename):
313 Concatenate all the certificates (PEM format for the export) in
314 'anchor_list' and write the result to file 'filename'. On success
315 'filename' is returned, None otherwise.
317 If you are used to OpenSSL tools, this function builds a CAfile
318 that can be used for certificate and CRL check.
320 Also see create_temporary_ca_file().
323 f = open(filename, "w")
324 for a in anchor_list:
325 s = a.output(fmt="PEM")
332 def create_temporary_ca_file(anchor_list):
334 Concatenate all the certificates (PEM format for the export) in
335 'anchor_list' and write the result to file to a temporary file
336 using mkstemp() from tempfile module. On success 'filename' is
337 returned, None otherwise.
339 If you are used to OpenSSL tools, this function builds a CAfile
340 that can be used for certificate and CRL check.
342 Also see create_temporary_ca_file().
345 f, fname = tempfile.mkstemp()
346 for a in anchor_list:
347 s = a.output(fmt="PEM")
354 def create_temporary_ca_path(anchor_list, folder):
356 Create a CA path folder as defined in OpenSSL terminology, by
357 storing all certificates in 'anchor_list' list in PEM format
358 under provided 'folder' and then creating the associated links
359 using the hash as usually done by c_rehash.
361 Note that you can also include CRL in 'anchor_list'. In that
362 case, they will also be stored under 'folder' and associated
363 links will be created.
365 In folder, the files are created with names of the form
366 0...ZZ.pem. If you provide an empty list, folder will be created
367 if it does not already exist, but that's all.
369 The number of certificates written to folder is returned on
370 success, None on error.
372 # We should probably avoid writing duplicate anchors and also
373 # check if they are all certs.
375 if not os.path.isdir(folder):
383 fmtstr = "%%0%sd.pem" % math.ceil(math.log(l, 10))
386 for a in anchor_list:
387 fname = os.path.join(folder, fmtstr % i)
389 s = a.output(fmt="PEM")
396 r,w=popen2.popen2("c_rehash %s" % folder)
402 #####################################################################
403 # Public Key Cryptography related stuff
404 #####################################################################
407 def _apply_ossl_cmd(self, osslcmd, rawdata):
408 r,w=popen2.popen2(osslcmd)
415 class _EncryptAndVerify:
416 ### Below are encryption methods
420 Internal method providing raw RSA encryption, i.e. simple modular
421 exponentiation of the given message representative 'm', a long
424 This is the encryption primitive RSAEP described in PKCS#1 v2.1,
425 i.e. RFC 3447 Sect. 5.1.1.
428 m: message representative, a long between 0 and n-1, where
429 n is the key modulus.
432 ciphertext representative, a long between 0 and n-1
434 Not intended to be used directly. Please, see encrypt() method.
440 if type(m) is not long or m > n-1:
441 warning("Key._rsaep() expects a long between 0 and n-1")
444 return self.key.encrypt(m, "")[0]
447 def _rsaes_pkcs1_v1_5_encrypt(self, M):
449 Implements RSAES-PKCS1-V1_5-ENCRYPT() function described in section
453 M: message to be encrypted, an octet string of length mLen, where
454 mLen <= k - 11 (k denotes the length in octets of the key modulus)
457 ciphertext, an octet string of length k
459 On error, None is returned.
464 k = self.modulusLen / 8
466 warning("Key._rsaes_pkcs1_v1_5_encrypt(): message too "
467 "long (%d > %d - 11)" % (mLen, k))
470 # 2) EME-PKCS1-v1_5 encoding
471 PS = zerofree_randstring(k - mLen - 3) # 2.a)
472 EM = '\x00' + '\x02' + PS + '\x00' + M # 2.b)
475 m = pkcs_os2ip(EM) # 3.a)
476 c = self._rsaep(m) # 3.b)
477 C = pkcs_i2osp(c, k) # 3.c)
482 def _rsaes_oaep_encrypt(self, M, h=None, mgf=None, L=None):
484 Internal method providing RSAES-OAEP-ENCRYPT as defined in Sect.
485 7.1.1 of RFC 3447. Not intended to be used directly. Please, see
486 encrypt() method for type "OAEP".
490 M : message to be encrypted, an octet string of length mLen
491 where mLen <= k - 2*hLen - 2 (k denotes the length in octets
492 of the RSA modulus and hLen the length in octets of the hash
494 h : hash function name (in 'md2', 'md4', 'md5', 'sha1', 'tls',
495 'sha256', 'sha384'). hLen denotes the length in octets of
496 the hash function output. 'sha1' is used by default if not
498 mgf: the mask generation function f : seed, maskLen -> mask
499 L : optional label to be associated with the message; the default
500 value for L, if not provided is the empty string
503 ciphertext, an octet string of length k
505 On error, None is returned.
507 # The steps below are the one described in Sect. 7.1.1 of RFC 3447.
513 if not _hashFuncParams.has_key(h):
514 warning("Key._rsaes_oaep_encrypt(): unknown hash function %s.", h)
516 hLen = _hashFuncParams[h][0]
517 hFun = _hashFuncParams[h][1]
518 k = self.modulusLen / 8
519 if mLen > k - 2*hLen - 2: # 1.b)
520 warning("Key._rsaes_oaep_encrypt(): message too long.")
523 # 2) EME-OAEP encoding
527 PS = '\x00'*(k - mLen - 2*hLen - 2) # 2.b)
528 DB = lHash + PS + '\x01' + M # 2.c)
529 seed = randstring(hLen) # 2.d)
530 if mgf is None: # 2.e)
531 mgf = lambda x,y: pkcs_mgf1(x,y,h)
532 dbMask = mgf(seed, k - hLen - 1)
533 maskedDB = strxor(DB, dbMask) # 2.f)
534 seedMask = mgf(maskedDB, hLen) # 2.g)
535 maskedSeed = strxor(seed, seedMask) # 2.h)
536 EM = '\x00' + maskedSeed + maskedDB # 2.i)
539 m = pkcs_os2ip(EM) # 3.a)
540 c = self._rsaep(m) # 3.b)
541 C = pkcs_i2osp(c, k) # 3.c)
546 def encrypt(self, m, t=None, h=None, mgf=None, L=None):
548 Encrypt message 'm' using 't' encryption scheme where 't' can be:
550 - None: the message 'm' is directly applied the RSAEP encryption
551 primitive, as described in PKCS#1 v2.1, i.e. RFC 3447
552 Sect 5.1.1. Simply put, the message undergo a modular
553 exponentiation using the public key. Additionnal method
554 parameters are just ignored.
556 - 'pkcs': the message 'm' is applied RSAES-PKCS1-V1_5-ENCRYPT encryption
557 scheme as described in section 7.2.1 of RFC 3447. In that
558 context, other parameters ('h', 'mgf', 'l') are not used.
560 - 'oaep': the message 'm' is applied the RSAES-OAEP-ENCRYPT encryption
561 scheme, as described in PKCS#1 v2.1, i.e. RFC 3447 Sect
562 7.1.1. In that context,
564 o 'h' parameter provides the name of the hash method to use.
565 Possible values are "md2", "md4", "md5", "sha1", "tls",
566 "sha224", "sha256", "sha384" and "sha512". if none is provided,
569 o 'mgf' is the mask generation function. By default, mgf
570 is derived from the provided hash function using the
571 generic MGF1 (see pkcs_mgf1() for details).
573 o 'L' is the optional label to be associated with the
574 message. If not provided, the default value is used, i.e
575 the empty string. No check is done on the input limitation
576 of the hash function regarding the size of 'L' (for
577 instance, 2^61 - 1 for SHA-1). You have been warned.
580 if t is None: # Raw encryption
583 return pkcs_i2osp(c, self.modulusLen/8)
586 return self._rsaes_pkcs1_v1_5_encrypt(m)
589 return self._rsaes_oaep_encrypt(m, h, mgf, L)
592 warning("Key.encrypt(): Unknown encryption type (%s) provided" % t)
595 ### Below are verification related methods
597 def _rsavp1(self, s):
599 Internal method providing raw RSA verification, i.e. simple modular
600 exponentiation of the given signature representative 'c', an integer
603 This is the signature verification primitive RSAVP1 described in
604 PKCS#1 v2.1, i.e. RFC 3447 Sect. 5.2.2.
607 s: signature representative, an integer between 0 and n-1,
608 where n is the key modulus.
611 message representative, an integer between 0 and n-1
613 Not intended to be used directly. Please, see verify() method.
615 return self._rsaep(s)
617 def _rsassa_pss_verify(self, M, S, h=None, mgf=None, sLen=None):
619 Implements RSASSA-PSS-VERIFY() function described in Sect 8.1.2
623 M: message whose signature is to be verified
624 S: signature to be verified, an octet string of length k, where k
625 is the length in octets of the RSA modulus n.
628 True is the signature is valid. False otherwise.
631 # Set default parameters if not provided
632 if h is None: # By default, sha1
634 if not _hashFuncParams.has_key(h):
635 warning("Key._rsassa_pss_verify(): unknown hash function "
638 if mgf is None: # use mgf1 with underlying hash function
639 mgf = lambda x,y: pkcs_mgf1(x, y, h)
640 if sLen is None: # use Hash output length (A.2.3 of RFC 3447)
641 hLen = _hashFuncParams[h][0]
645 modBits = self.modulusLen
650 # 2) RSA verification
651 s = pkcs_os2ip(S) # 2.a)
652 m = self._rsavp1(s) # 2.b)
653 emLen = math.ceil((modBits - 1) / 8.) # 2.c)
654 EM = pkcs_i2osp(m, emLen)
656 # 3) EMSA-PSS verification
657 Result = pkcs_emsa_pss_verify(M, EM, modBits - 1, h, mgf, sLen)
662 def _rsassa_pkcs1_v1_5_verify(self, M, S, h):
664 Implements RSASSA-PKCS1-v1_5-VERIFY() function as described in
665 Sect. 8.2.2 of RFC 3447.
668 M: message whose signature is to be verified, an octet string
669 S: signature to be verified, an octet string of length k, where
670 k is the length in octets of the RSA modulus n
671 h: hash function name (in 'md2', 'md4', 'md5', 'sha1', 'tls',
675 True if the signature is valid. False otherwise.
679 k = self.modulusLen / 8
681 warning("invalid signature (len(S) != k)")
684 # 2) RSA verification
685 s = pkcs_os2ip(S) # 2.a)
686 m = self._rsavp1(s) # 2.b)
687 EM = pkcs_i2osp(m, k) # 2.c)
689 # 3) EMSA-PKCS1-v1_5 encoding
690 EMPrime = pkcs_emsa_pkcs1_v1_5_encode(M, k, h)
692 warning("Key._rsassa_pkcs1_v1_5_verify(): unable to encode.")
699 def verify(self, M, S, t=None, h=None, mgf=None, sLen=None):
701 Verify alleged signature 'S' is indeed the signature of message 'M' using
702 't' signature scheme where 't' can be:
704 - None: the alleged signature 'S' is directly applied the RSAVP1 signature
705 primitive, as described in PKCS#1 v2.1, i.e. RFC 3447 Sect
706 5.2.1. Simply put, the provided signature is applied a moular
707 exponentiation using the public key. Then, a comparison of the
708 result is done against 'M'. On match, True is returned.
709 Additionnal method parameters are just ignored.
711 - 'pkcs': the alleged signature 'S' and message 'M' are applied
712 RSASSA-PKCS1-v1_5-VERIFY signature verification scheme as
713 described in Sect. 8.2.2 of RFC 3447. In that context,
714 the hash function name is passed using 'h'. Possible values are
715 "md2", "md4", "md5", "sha1", "tls", "sha224", "sha256", "sha384"
716 and "sha512". If none is provided, sha1 is used. Other additionnal
717 parameters are ignored.
719 - 'pss': the alleged signature 'S' and message 'M' are applied
720 RSASSA-PSS-VERIFY signature scheme as described in Sect. 8.1.2.
721 of RFC 3447. In that context,
723 o 'h' parameter provides the name of the hash method to use.
724 Possible values are "md2", "md4", "md5", "sha1", "tls", "sha224",
725 "sha256", "sha384" and "sha512". if none is provided, sha1
728 o 'mgf' is the mask generation function. By default, mgf
729 is derived from the provided hash function using the
730 generic MGF1 (see pkcs_mgf1() for details).
732 o 'sLen' is the length in octet of the salt. You can overload the
733 default value (the octet length of the hash value for provided
734 algorithm) by providing another one with that parameter.
736 if t is None: # RSAVP1
740 warning("Signature to be verified is too long for key modulus")
745 l = int(math.ceil(math.log(m, 2) / 8.)) # Hack
749 elif t == "pkcs": # RSASSA-PKCS1-v1_5-VERIFY
752 return self._rsassa_pkcs1_v1_5_verify(M, S, h)
754 elif t == "pss": # RSASSA-PSS-VERIFY
755 return self._rsassa_pss_verify(M, S, h, mgf, sLen)
758 warning("Key.verify(): Unknown signature type (%s) provided" % t)
761 class _DecryptAndSignMethods(OSSLHelper):
762 ### Below are decryption related methods. Encryption ones are inherited
767 Internal method providing raw RSA decryption, i.e. simple modular
768 exponentiation of the given ciphertext representative 'c', a long
771 This is the decryption primitive RSADP described in PKCS#1 v2.1,
772 i.e. RFC 3447 Sect. 5.1.2.
775 c: ciphertest representative, a long between 0 and n-1, where
776 n is the key modulus.
779 ciphertext representative, a long between 0 and n-1
781 Not intended to be used directly. Please, see encrypt() method.
787 if type(c) is not long or c > n-1:
788 warning("Key._rsaep() expects a long between 0 and n-1")
791 return self.key.decrypt(c)
794 def _rsaes_pkcs1_v1_5_decrypt(self, C):
796 Implements RSAES-PKCS1-V1_5-DECRYPT() function described in section
800 C: ciphertext to be decrypted, an octet string of length k, where
801 k is the length in octets of the RSA modulus n.
804 an octet string of length k at most k - 11
806 on error, None is returned.
811 k = self.modulusLen / 8
812 if cLen != k or k < 11:
813 warning("Key._rsaes_pkcs1_v1_5_decrypt() decryption error "
814 "(cLen != k or k < 11)")
818 c = pkcs_os2ip(C) # 2.a)
819 m = self._rsadp(c) # 2.b)
820 EM = pkcs_i2osp(m, k) # 2.c)
822 # 3) EME-PKCS1-v1_5 decoding
824 # I am aware of the note at the end of 7.2.2 regarding error
825 # conditions reporting but the one provided below are for _local_
826 # debugging purposes. --arno
829 warning("Key._rsaes_pkcs1_v1_5_decrypt(): decryption error "
830 "(first byte is not 0x00)")
834 warning("Key._rsaes_pkcs1_v1_5_decrypt(): decryption error "
835 "(second byte is not 0x02)")
838 tmp = EM[2:].split('\x00', 1)
840 warning("Key._rsaes_pkcs1_v1_5_decrypt(): decryption error "
841 "(no 0x00 to separate PS from M)")
846 warning("Key._rsaes_pkcs1_v1_5_decrypt(): decryption error "
847 "(PS is less than 8 byte long)")
853 def _rsaes_oaep_decrypt(self, C, h=None, mgf=None, L=None):
855 Internal method providing RSAES-OAEP-DECRYPT as defined in Sect.
856 7.1.2 of RFC 3447. Not intended to be used directly. Please, see
857 encrypt() method for type "OAEP".
861 C : ciphertext to be decrypted, an octet string of length k, where
862 k = 2*hLen + 2 (k denotes the length in octets of the RSA modulus
863 and hLen the length in octets of the hash function output)
864 h : hash function name (in 'md2', 'md4', 'md5', 'sha1', 'tls',
865 'sha256', 'sha384'). 'sha1' is used if none is provided.
866 mgf: the mask generation function f : seed, maskLen -> mask
867 L : optional label whose association with the message is to be
868 verified; the default value for L, if not provided is the empty
872 message, an octet string of length k mLen, where mLen <= k - 2*hLen - 2
874 On error, None is returned.
876 # The steps below are the one described in Sect. 7.1.2 of RFC 3447.
882 if not _hashFuncParams.has_key(h):
883 warning("Key._rsaes_oaep_decrypt(): unknown hash function %s.", h)
885 hLen = _hashFuncParams[h][0]
886 hFun = _hashFuncParams[h][1]
887 k = self.modulusLen / 8
890 warning("Key._rsaes_oaep_decrypt(): decryption error. "
894 warning("Key._rsaes_oaep_decrypt(): decryption error. "
899 c = pkcs_os2ip(C) # 2.a)
900 m = self._rsadp(c) # 2.b)
901 EM = pkcs_i2osp(m, k) # 2.c)
903 # 3) EME-OAEP decoding
909 warning("Key._rsaes_oaep_decrypt(): decryption error. "
912 maskedSeed = EM[1:1+hLen]
913 maskedDB = EM[1+hLen:]
915 mgf = lambda x,y: pkcs_mgf1(x, y, h)
916 seedMask = mgf(maskedDB, hLen) # 3.c)
917 seed = strxor(maskedSeed, seedMask) # 3.d)
918 dbMask = mgf(seed, k - hLen - 1) # 3.e)
919 DB = strxor(maskedDB, dbMask) # 3.f)
921 # I am aware of the note at the end of 7.1.2 regarding error
922 # conditions reporting but the one provided below are for _local_
923 # debugging purposes. --arno
925 lHashPrime = DB[:hLen] # 3.g)
926 tmp = DB[hLen:].split('\x01', 1)
928 warning("Key._rsaes_oaep_decrypt(): decryption error. "
929 "(0x01 separator not found)")
932 if PS != '\x00'*len(PS):
933 warning("Key._rsaes_oaep_decrypt(): decryption error. "
934 "(invalid padding string)")
936 if lHash != lHashPrime:
937 warning("Key._rsaes_oaep_decrypt(): decryption error. "
943 def decrypt(self, C, t=None, h=None, mgf=None, L=None):
945 Decrypt ciphertext 'C' using 't' decryption scheme where 't' can be:
947 - None: the ciphertext 'C' is directly applied the RSADP decryption
948 primitive, as described in PKCS#1 v2.1, i.e. RFC 3447
949 Sect 5.1.2. Simply, put the message undergo a modular
950 exponentiation using the private key. Additionnal method
951 parameters are just ignored.
953 - 'pkcs': the ciphertext 'C' is applied RSAES-PKCS1-V1_5-DECRYPT
954 decryption scheme as described in section 7.2.2 of RFC 3447.
955 In that context, other parameters ('h', 'mgf', 'l') are not
958 - 'oaep': the ciphertext 'C' is applied the RSAES-OAEP-DECRYPT decryption
959 scheme, as described in PKCS#1 v2.1, i.e. RFC 3447 Sect
960 7.1.2. In that context,
962 o 'h' parameter provides the name of the hash method to use.
963 Possible values are "md2", "md4", "md5", "sha1", "tls",
964 "sha224", "sha256", "sha384" and "sha512". if none is provided,
965 sha1 is used by default.
967 o 'mgf' is the mask generation function. By default, mgf
968 is derived from the provided hash function using the
969 generic MGF1 (see pkcs_mgf1() for details).
971 o 'L' is the optional label to be associated with the
972 message. If not provided, the default value is used, i.e
973 the empty string. No check is done on the input limitation
974 of the hash function regarding the size of 'L' (for
975 instance, 2^61 - 1 for SHA-1). You have been warned.
980 l = int(math.ceil(math.log(c, 2) / 8.)) # Hack
981 return pkcs_i2osp(c, l)
984 return self._rsaes_pkcs1_v1_5_decrypt(C)
987 return self._rsaes_oaep_decrypt(C, h, mgf, L)
990 warning("Key.decrypt(): Unknown decryption type (%s) provided" % t)
993 ### Below are signature related methods. Verification ones are inherited from
996 def _rsasp1(self, m):
998 Internal method providing raw RSA signature, i.e. simple modular
999 exponentiation of the given message representative 'm', an integer
1002 This is the signature primitive RSASP1 described in PKCS#1 v2.1,
1003 i.e. RFC 3447 Sect. 5.2.1.
1006 m: message representative, an integer between 0 and n-1, where
1007 n is the key modulus.
1010 signature representative, an integer between 0 and n-1
1012 Not intended to be used directly. Please, see sign() method.
1014 return self._rsadp(m)
1017 def _rsassa_pss_sign(self, M, h=None, mgf=None, sLen=None):
1019 Implements RSASSA-PSS-SIGN() function described in Sect. 8.1.1 of
1023 M: message to be signed, an octet string
1026 signature, an octet string of length k, where k is the length in
1027 octets of the RSA modulus n.
1029 On error, None is returned.
1032 # Set default parameters if not provided
1033 if h is None: # By default, sha1
1035 if not _hashFuncParams.has_key(h):
1036 warning("Key._rsassa_pss_sign(): unknown hash function "
1037 "provided (%s)" % h)
1039 if mgf is None: # use mgf1 with underlying hash function
1040 mgf = lambda x,y: pkcs_mgf1(x, y, h)
1041 if sLen is None: # use Hash output length (A.2.3 of RFC 3447)
1042 hLen = _hashFuncParams[h][0]
1045 # 1) EMSA-PSS encoding
1046 modBits = self.modulusLen
1048 EM = pkcs_emsa_pss_encode(M, modBits - 1, h, mgf, sLen)
1050 warning("Key._rsassa_pss_sign(): unable to encode")
1054 m = pkcs_os2ip(EM) # 2.a)
1055 s = self._rsasp1(m) # 2.b)
1056 S = pkcs_i2osp(s, k) # 2.c)
1061 def _rsassa_pkcs1_v1_5_sign(self, M, h):
1063 Implements RSASSA-PKCS1-v1_5-SIGN() function as described in
1064 Sect. 8.2.1 of RFC 3447.
1067 M: message to be signed, an octet string
1068 h: hash function name (in 'md2', 'md4', 'md5', 'sha1', 'tls'
1069 'sha256', 'sha384').
1072 the signature, an octet string.
1075 # 1) EMSA-PKCS1-v1_5 encoding
1076 k = self.modulusLen / 8
1077 EM = pkcs_emsa_pkcs1_v1_5_encode(M, k, h)
1079 warning("Key._rsassa_pkcs1_v1_5_sign(): unable to encode")
1083 m = pkcs_os2ip(EM) # 2.a)
1084 s = self._rsasp1(m) # 2.b)
1085 S = pkcs_i2osp(s, k) # 2.c)
1090 def sign(self, M, t=None, h=None, mgf=None, sLen=None):
1092 Sign message 'M' using 't' signature scheme where 't' can be:
1094 - None: the message 'M' is directly applied the RSASP1 signature
1095 primitive, as described in PKCS#1 v2.1, i.e. RFC 3447 Sect
1096 5.2.1. Simply put, the message undergo a modular exponentiation
1097 using the private key. Additionnal method parameters are just
1100 - 'pkcs': the message 'M' is applied RSASSA-PKCS1-v1_5-SIGN signature
1101 scheme as described in Sect. 8.2.1 of RFC 3447. In that context,
1102 the hash function name is passed using 'h'. Possible values are
1103 "md2", "md4", "md5", "sha1", "tls", "sha224", "sha256", "sha384"
1104 and "sha512". If none is provided, sha1 is used. Other additionnal
1105 parameters are ignored.
1107 - 'pss' : the message 'M' is applied RSASSA-PSS-SIGN signature scheme as
1108 described in Sect. 8.1.1. of RFC 3447. In that context,
1110 o 'h' parameter provides the name of the hash method to use.
1111 Possible values are "md2", "md4", "md5", "sha1", "tls", "sha224",
1112 "sha256", "sha384" and "sha512". if none is provided, sha1
1115 o 'mgf' is the mask generation function. By default, mgf
1116 is derived from the provided hash function using the
1117 generic MGF1 (see pkcs_mgf1() for details).
1119 o 'sLen' is the length in octet of the salt. You can overload the
1120 default value (the octet length of the hash value for provided
1121 algorithm) by providing another one with that parameter.
1124 if t is None: # RSASP1
1128 warning("Message to be signed is too long for key modulus")
1133 return pkcs_i2osp(s, self.modulusLen/8)
1135 elif t == "pkcs": # RSASSA-PKCS1-v1_5-SIGN
1138 return self._rsassa_pkcs1_v1_5_sign(M, h)
1140 elif t == "pss": # RSASSA-PSS-SIGN
1141 return self._rsassa_pss_sign(M, h, mgf, sLen)
1144 warning("Key.sign(): Unknown signature type (%s) provided" % t)
1150 class PubKey(OSSLHelper, _EncryptAndVerify):
1151 # Below are the fields we recognize in the -text output of openssl
1152 # and from which we extract information. We expect them in that
1153 # order. Number of spaces does matter.
1154 possible_fields = [ "Modulus (",
1156 possible_fields_count = len(possible_fields)
1158 def __init__(self, keypath):
1159 error_msg = "Unable to import key."
1161 # XXX Temporary hack to use PubKey inside Cert
1162 if type(keypath) is tuple:
1163 e, m, mLen = keypath
1165 self.modulusLen = mLen
1170 for k in self.possible_fields:
1171 fields_dict[k] = None
1176 if (not '\x00' in keypath) and os.path.isfile(keypath): # file
1177 self.keypath = keypath
1178 key_size = os.path.getsize(keypath)
1179 if key_size > MAX_KEY_SIZE:
1180 raise Exception(error_msg)
1186 raise Exception(error_msg)
1191 raise Exception(error_msg)
1193 self.rawkey = rawkey
1195 # Let's try to get file format : PEM or DER.
1196 fmtstr = 'openssl rsa -text -pubin -inform %s -noout '
1197 convertstr = 'openssl rsa -pubin -inform %s -outform %s 2>/dev/null'
1198 key_header = "-----BEGIN PUBLIC KEY-----"
1199 key_footer = "-----END PUBLIC KEY-----"
1200 l = rawkey.split(key_header, 1)
1201 if len(l) == 2: # looks like PEM
1203 l = tmp.split(key_footer, 1)
1206 rawkey = "%s%s%s\n" % (key_header, tmp, key_footer)
1208 raise Exception(error_msg)
1209 r,w,e = popen2.popen3(fmtstr % "PEM")
1218 self.pemkey = rawkey
1219 self.textkey = textkey
1220 cmd = convertstr % ("PEM", "DER")
1221 self.derkey = self._apply_ossl_cmd(cmd, rawkey)
1223 raise Exception(error_msg)
1224 else: # not PEM, try DER
1225 r,w,e = popen2.popen3(fmtstr % "DER")
1233 self.derkey = rawkey
1234 self.textkey = textkey
1235 cmd = convertstr % ("DER", "PEM")
1236 self.pemkey = self._apply_ossl_cmd(cmd, rawkey)
1237 cmd = convertstr % ("DER", "DER")
1238 self.derkey = self._apply_ossl_cmd(cmd, rawkey)
1240 try: # Perhaps it is a cert
1243 raise Exception(error_msg)
1245 # Reconstruct a key (der and pem) and provide:
1252 self.osslcmdbase = 'openssl rsa -pubin -inform %s ' % self.format
1254 self.keypath = keypath
1256 # Parse the -text output of openssl to make things available
1257 l = self.textkey.split('\n', 1)
1259 raise Exception(error_msg)
1262 k = self.possible_fields[i] # Modulus (
1263 cur = cur[len(k):] + '\n'
1265 l = tmp.split('\n', 1)
1266 if len(l) != 2: # Over
1267 fields_dict[k] = cur
1272 # skip fields we have already seen, this is the purpose of 'i'
1273 for j in range(i, self.possible_fields_count):
1274 f = self.possible_fields[j]
1276 fields_dict[k] = cur
1277 cur = l[len(f):] + '\n'
1286 # modulus and modulus length
1287 v = fields_dict["Modulus ("]
1288 self.modulusLen = None
1290 v, rem = v.split(' bit):', 1)
1291 self.modulusLen = int(v)
1292 rem = rem.replace('\n','').replace(' ','').replace(':','')
1293 self.modulus = long(rem, 16)
1294 if self.modulus is None:
1295 raise Exception(error_msg)
1298 v = fields_dict["Exponent:"]
1301 self.pubExp = long(v.split('(', 1)[0])
1302 if self.pubExp is None:
1303 raise Exception(error_msg)
1305 self.key = RSA.construct((self.modulus, self.pubExp, ))
1311 class Key(OSSLHelper, _DecryptAndSignMethods, _EncryptAndVerify):
1312 # Below are the fields we recognize in the -text output of openssl
1313 # and from which we extract information. We expect them in that
1314 # order. Number of spaces does matter.
1315 possible_fields = [ "Private-Key: (",
1324 possible_fields_count = len(possible_fields)
1326 def __init__(self, keypath):
1327 error_msg = "Unable to import key."
1330 for k in self.possible_fields:
1331 fields_dict[k] = None
1336 if (not '\x00' in keypath) and os.path.isfile(keypath):
1337 self.keypath = keypath
1338 key_size = os.path.getsize(keypath)
1339 if key_size > MAX_KEY_SIZE:
1340 raise Exception(error_msg)
1346 raise Exception(error_msg)
1351 raise Exception(error_msg)
1353 self.rawkey = rawkey
1355 # Let's try to get file format : PEM or DER.
1356 fmtstr = 'openssl rsa -text -inform %s -noout '
1357 convertstr = 'openssl rsa -inform %s -outform %s 2>/dev/null'
1358 key_header = "-----BEGIN RSA PRIVATE KEY-----"
1359 key_footer = "-----END RSA PRIVATE KEY-----"
1360 l = rawkey.split(key_header, 1)
1361 if len(l) == 2: # looks like PEM
1363 l = tmp.split(key_footer, 1)
1366 rawkey = "%s%s%s\n" % (key_header, tmp, key_footer)
1368 raise Exception(error_msg)
1369 r,w,e = popen2.popen3(fmtstr % "PEM")
1378 self.pemkey = rawkey
1379 self.textkey = textkey
1380 cmd = convertstr % ("PEM", "DER")
1381 self.derkey = self._apply_ossl_cmd(cmd, rawkey)
1383 raise Exception(error_msg)
1384 else: # not PEM, try DER
1385 r,w,e = popen2.popen3(fmtstr % "DER")
1393 self.derkey = rawkey
1394 self.textkey = textkey
1395 cmd = convertstr % ("DER", "PEM")
1396 self.pemkey = self._apply_ossl_cmd(cmd, rawkey)
1397 cmd = convertstr % ("DER", "DER")
1398 self.derkey = self._apply_ossl_cmd(cmd, rawkey)
1400 raise Exception(error_msg)
1402 self.osslcmdbase = 'openssl rsa -inform %s ' % self.format
1404 r,w,e = popen2.popen3('openssl asn1parse -inform DER ')
1405 w.write(self.derkey)
1407 self.asn1parsekey = r.read()
1412 raise Exception(error_msg)
1414 self.keypath = keypath
1416 # Parse the -text output of openssl to make things available
1417 l = self.textkey.split('\n', 1)
1419 raise Exception(error_msg)
1422 k = self.possible_fields[i] # Private-Key: (
1423 cur = cur[len(k):] + '\n'
1425 l = tmp.split('\n', 1)
1426 if len(l) != 2: # Over
1427 fields_dict[k] = cur
1432 # skip fields we have already seen, this is the purpose of 'i'
1433 for j in range(i, self.possible_fields_count):
1434 f = self.possible_fields[j]
1436 fields_dict[k] = cur
1437 cur = l[len(f):] + '\n'
1447 v = fields_dict["Private-Key: ("]
1448 self.modulusLen = None
1450 self.modulusLen = int(v.split(' bit', 1)[0])
1451 if self.modulusLen is None:
1452 raise Exception(error_msg)
1455 v = fields_dict["publicExponent:"]
1458 self.pubExp = long(v.split('(', 1)[0])
1459 if self.pubExp is None:
1460 raise Exception(error_msg)
1463 for k in ["modulus:", "privateExponent:", "prime1:", "prime2:",
1464 "exponent1:", "exponent2:", "coefficient:"]:
1467 s = v.replace('\n', '').replace(' ', '').replace(':', '')
1468 tmp[k] = long(s, 16)
1470 raise Exception(error_msg)
1472 self.modulus = tmp["modulus:"]
1473 self.privExp = tmp["privateExponent:"]
1474 self.prime1 = tmp["prime1:"]
1475 self.prime2 = tmp["prime2:"]
1476 self.exponent1 = tmp["exponent1:"]
1477 self.exponent2 = tmp["exponent2:"]
1478 self.coefficient = tmp["coefficient:"]
1480 self.key = RSA.construct((self.modulus, self.pubExp, self.privExp))
1486 # We inherit from PubKey to get access to all encryption and verification
1487 # methods. To have that working, we simply need Cert to provide
1488 # modulusLen and key attribute.
1489 # XXX Yes, it is a hack.
1490 class Cert(OSSLHelper, _EncryptAndVerify):
1491 # Below are the fields we recognize in the -text output of openssl
1492 # and from which we extract information. We expect them in that
1493 # order. Number of spaces does matter.
1494 possible_fields = [ " Version:",
1496 " Signature Algorithm:",
1501 " Public Key Algorithm:",
1504 " X509v3 Subject Key Identifier:",
1505 " X509v3 Authority Key Identifier:",
1509 " X509v3 Basic Constraints:",
1510 " X509v3 Key Usage:",
1511 " X509v3 Extended Key Usage:",
1512 " X509v3 CRL Distribution Points:",
1513 " Authority Information Access:",
1514 " Signature Algorithm:" ]
1515 possible_fields_count = len(possible_fields)
1517 def __init__(self, certpath):
1518 error_msg = "Unable to import certificate."
1521 for k in self.possible_fields:
1522 fields_dict[k] = None
1524 self.certpath = None
1527 if (not '\x00' in certpath) and os.path.isfile(certpath): # file
1528 self.certpath = certpath
1529 cert_size = os.path.getsize(certpath)
1530 if cert_size > MAX_CERT_SIZE:
1531 raise Exception(error_msg)
1537 raise Exception(error_msg)
1542 raise Exception(error_msg)
1544 self.rawcert = rawcert
1546 # Let's try to get file format : PEM or DER.
1547 fmtstr = 'openssl x509 -text -inform %s -noout '
1548 convertstr = 'openssl x509 -inform %s -outform %s '
1549 cert_header = "-----BEGIN CERTIFICATE-----"
1550 cert_footer = "-----END CERTIFICATE-----"
1551 l = rawcert.split(cert_header, 1)
1552 if len(l) == 2: # looks like PEM
1554 l = tmp.split(cert_footer, 1)
1557 rawcert = "%s%s%s\n" % (cert_header, tmp, cert_footer)
1559 raise Exception(error_msg)
1560 r,w,e = popen2.popen3(fmtstr % "PEM")
1569 self.pemcert = rawcert
1570 self.textcert = textcert
1571 cmd = convertstr % ("PEM", "DER")
1572 self.dercert = self._apply_ossl_cmd(cmd, rawcert)
1574 raise Exception(error_msg)
1575 else: # not PEM, try DER
1576 r,w,e = popen2.popen3(fmtstr % "DER")
1584 self.dercert = rawcert
1585 self.textcert = textcert
1586 cmd = convertstr % ("DER", "PEM")
1587 self.pemcert = self._apply_ossl_cmd(cmd, rawcert)
1588 cmd = convertstr % ("DER", "DER")
1589 self.dercert = self._apply_ossl_cmd(cmd, rawcert)
1591 raise Exception(error_msg)
1593 self.osslcmdbase = 'openssl x509 -inform %s ' % self.format
1595 r,w,e = popen2.popen3('openssl asn1parse -inform DER ')
1596 w.write(self.dercert)
1598 self.asn1parsecert = r.read()
1603 raise Exception(error_msg)
1605 # Grab _raw_ X509v3 Authority Key Identifier, if any.
1606 tmp = self.asn1parsecert.split(":X509v3 Authority Key Identifier", 1)
1607 self.authorityKeyID = None
1610 tmp = tmp.split("[HEX DUMP]:", 1)[1]
1611 self.authorityKeyID=tmp.split('\n',1)[0]
1613 # Grab _raw_ X509v3 Subject Key Identifier, if any.
1614 tmp = self.asn1parsecert.split(":X509v3 Subject Key Identifier", 1)
1615 self.subjectKeyID = None
1618 tmp = tmp.split("[HEX DUMP]:", 1)[1]
1619 self.subjectKeyID=tmp.split('\n',1)[0]
1621 # Get tbsCertificate using the worst hack. output of asn1parse
1624 # 0:d=0 hl=4 l=1298 cons: SEQUENCE
1625 # 4:d=1 hl=4 l=1018 cons: SEQUENCE
1628 l1,l2 = self.asn1parsecert.split('\n', 2)[:2]
1629 hl1 = int(l1.split("hl=",1)[1].split("l=",1)[0])
1630 rem = l2.split("hl=",1)[1]
1631 hl2, rem = rem.split("l=",1)
1633 l = int(rem.split("cons",1)[0])
1634 self.tbsCertificate = self.dercert[hl1:hl1+hl2+l]
1636 # Parse the -text output of openssl to make things available
1637 tmp = self.textcert.split('\n', 2)[2]
1638 l = tmp.split('\n', 1)
1640 raise Exception(error_msg)
1643 k = self.possible_fields[i] # Version:
1644 cur = cur[len(k):] + '\n'
1646 l = tmp.split('\n', 1)
1647 if len(l) != 2: # Over
1648 fields_dict[k] = cur
1653 # skip fields we have already seen, this is the purpose of 'i'
1654 for j in range(i, self.possible_fields_count):
1655 f = self.possible_fields[j]
1657 fields_dict[k] = cur
1658 cur = l[len(f):] + '\n'
1668 v = fields_dict[" Version:"]
1671 self.version = int(v[1:2])
1672 if self.version is None:
1673 raise Exception(error_msg)
1676 v = fields_dict[" Serial Number:"]
1679 v = v.replace('\n', '').strip()
1681 v = v.split("0x", 1)[1].split(')', 1)[0]
1682 v = v.replace(':', '').upper()
1686 if self.serial is None:
1687 raise Exception(error_msg)
1689 # Signature Algorithm
1690 v = fields_dict[" Signature Algorithm:"]
1693 v = v.split('\n',1)[0]
1696 if self.sigAlg is None:
1697 raise Exception(error_msg)
1700 v = fields_dict[" Issuer:"]
1703 v = v.split('\n',1)[0]
1706 if self.issuer is None:
1707 raise Exception(error_msg)
1710 v = fields_dict[" Not Before:"]
1711 self.notBefore_str = None
1713 v = v.split('\n',1)[0]
1715 self.notBefore_str = v
1716 if self.notBefore_str is None:
1717 raise Exception(error_msg)
1718 self.notBefore = time.strptime(self.notBefore_str,
1719 "%b %d %H:%M:%S %Y %Z")
1720 self.notBefore_str_simple = time.strftime("%x", self.notBefore)
1723 v = fields_dict[" Not After :"]
1724 self.notAfter_str = None
1726 v = v.split('\n',1)[0]
1728 self.notAfter_str = v
1729 if self.notAfter_str is None:
1730 raise Exception(error_msg)
1731 self.notAfter = time.strptime(self.notAfter_str,
1732 "%b %d %H:%M:%S %Y %Z")
1733 self.notAfter_str_simple = time.strftime("%x", self.notAfter)
1736 v = fields_dict[" Subject:"]
1739 v = v.split('\n',1)[0]
1742 if self.subject is None:
1743 raise Exception(error_msg)
1745 # Public Key Algorithm
1746 v = fields_dict[" Public Key Algorithm:"]
1747 self.pubKeyAlg = None
1749 v = v.split('\n',1)[0]
1752 if self.pubKeyAlg is None:
1753 raise Exception(error_msg)
1756 v = fields_dict[" Modulus ("]
1759 v,t = v.split(' bit):',1)
1760 self.modulusLen = int(v)
1761 t = t.replace(' ', '').replace('\n', ''). replace(':', '')
1762 self.modulus_hexdump = t
1763 self.modulus = long(t, 16)
1764 if self.modulus is None:
1765 raise Exception(error_msg)
1768 v = fields_dict[" Exponent:"]
1769 self.exponent = None
1771 v = v.split('(',1)[0]
1772 self.exponent = long(v)
1773 if self.exponent is None:
1774 raise Exception(error_msg)
1776 # Public Key instance
1777 self.key = RSA.construct((self.modulus, self.exponent, ))
1779 # Subject Key Identifier
1781 # Authority Key Identifier: keyid, dirname and serial
1782 self.authorityKeyID_keyid = None
1783 self.authorityKeyID_dirname = None
1784 self.authorityKeyID_serial = None
1785 if self.authorityKeyID: # (hex version already done using asn1parse)
1786 v = fields_dict[" keyid:"]
1788 v = v.split('\n',1)[0]
1789 v = v.strip().replace(':', '')
1790 self.authorityKeyID_keyid = v
1791 v = fields_dict[" DirName:"]
1793 v = v.split('\n',1)[0]
1794 self.authorityKeyID_dirname = v
1795 v = fields_dict[" serial:"]
1797 v = v.split('\n',1)[0]
1798 v = v.strip().replace(':', '')
1799 self.authorityKeyID_serial = v
1802 self.basicConstraintsCritical = False
1803 self.basicConstraints=None
1804 v = fields_dict[" X509v3 Basic Constraints:"]
1806 self.basicConstraints = {}
1807 v,t = v.split('\n',2)[:2]
1809 self.basicConstraintsCritical = True
1811 self.basicConstraints["CA"] = t.split('CA:')[1][:4] == "TRUE"
1813 self.basicConstraints["pathlen"] = int(t.split('pathlen:')[1])
1817 v = fields_dict[" X509v3 Key Usage:"]
1819 # man 5 x509v3_config
1820 ku_mapping = {"Digital Signature": "digitalSignature",
1821 "Non Repudiation": "nonRepudiation",
1822 "Key Encipherment": "keyEncipherment",
1823 "Data Encipherment": "dataEncipherment",
1824 "Key Agreement": "keyAgreement",
1825 "Certificate Sign": "keyCertSign",
1826 "CRL Sign": "cRLSign",
1827 "Encipher Only": "encipherOnly",
1828 "Decipher Only": "decipherOnly"}
1829 v = v.split('\n',2)[1]
1830 l = map(lambda x: x.strip(), v.split(','))
1833 if ku_mapping.has_key(c):
1834 self.keyUsage.append(ku_mapping[c])
1836 self.keyUsage.append(c) # Add it anyway
1837 print "Found unknown X509v3 Key Usage: '%s'" % c
1838 print "Report it to arno (at) natisbad.org for addition"
1840 # X509v3 Extended Key Usage
1841 self.extKeyUsage = []
1842 v = fields_dict[" X509v3 Extended Key Usage:"]
1844 # man 5 x509v3_config:
1845 eku_mapping = {"TLS Web Server Authentication": "serverAuth",
1846 "TLS Web Client Authentication": "clientAuth",
1847 "Code Signing": "codeSigning",
1848 "E-mail Protection": "emailProtection",
1849 "Time Stamping": "timeStamping",
1850 "Microsoft Individual Code Signing": "msCodeInd",
1851 "Microsoft Commercial Code Signing": "msCodeCom",
1852 "Microsoft Trust List Signing": "msCTLSign",
1853 "Microsoft Encrypted File System": "msEFS",
1854 "Microsoft Server Gated Crypto": "msSGC",
1855 "Netscape Server Gated Crypto": "nsSGC",
1856 "IPSec End System": "iPsecEndSystem",
1857 "IPSec Tunnel": "iPsecTunnel",
1858 "IPSec User": "iPsecUser"}
1859 v = v.split('\n',2)[1]
1860 l = map(lambda x: x.strip(), v.split(','))
1863 if eku_mapping.has_key(c):
1864 self.extKeyUsage.append(eku_mapping[c])
1866 self.extKeyUsage.append(c) # Add it anyway
1867 print "Found unknown X509v3 Extended Key Usage: '%s'" % c
1868 print "Report it to arno (at) natisbad.org for addition"
1870 # CRL Distribution points
1871 self.cRLDistributionPoints = []
1872 v = fields_dict[" X509v3 CRL Distribution Points:"]
1874 v = v.split("\n\n", 1)[0]
1875 v = v.split("URI:")[1:]
1876 self.CRLDistributionPoints = map(lambda x: x.strip(), v)
1878 # Authority Information Access: list of tuples ("method", "location")
1879 self.authorityInfoAccess = []
1880 v = fields_dict[" Authority Information Access:"]
1882 v = v.split("\n\n", 1)[0]
1883 v = v.split("\n")[1:]
1885 method, location = map(lambda x: x.strip(), e.split(" - ", 1))
1886 self.authorityInfoAccess.append((method, location))
1889 v = fields_dict[" Signature Algorithm:" ]
1892 v = v.split('\n',1)[1]
1893 v = v.replace(' ', '').replace('\n', '')
1894 self.sig = "".join(map(lambda x: chr(int(x, 16)), v.split(':')))
1895 self.sigLen = len(self.sig)
1896 if self.sig is None:
1897 raise Exception(error_msg)
1899 def isIssuerCert(self, other):
1901 True if 'other' issued 'self', i.e.:
1902 - self.issuer == other.subject
1903 - self is signed by other
1905 # XXX should be done on raw values, instead of their textual repr
1906 if self.issuer != other.subject:
1909 # Sanity check regarding modulus length and the
1911 keyLen = (other.modulusLen + 7)/8
1912 if keyLen != self.sigLen:
1915 unenc = other.encrypt(self.sig) # public key encryption, i.e. decrypt
1917 # XXX Check block type (00 or 01 and type of padding)
1919 if not '\x00' in unenc:
1921 pos = unenc.index('\x00')
1922 unenc = unenc[pos+1:]
1925 for k in _hashFuncParams.keys():
1926 if self.sigAlg.startswith(k):
1931 hlen, hfunc, digestInfo = _hashFuncParams[k]
1933 if len(unenc) != (hlen+len(digestInfo)):
1936 if not unenc.startswith(digestInfo):
1940 myh = hfunc(self.tbsCertificate)
1944 def chain(self, certlist):
1946 Construct the chain of certificates leading from 'self' to the
1947 self signed root using the certificates in 'certlist'. If the
1948 list does not provide all the required certs to go to the root
1949 the function returns a incomplete chain starting with the
1950 certificate. This fact can be tested by tchecking if the last
1951 certificate of the returned chain is self signed (if c is the
1952 result, c[-1].isSelfSigned())
1956 # XXX we should check if we have duplicate
1960 while not cur.isSelfSigned():
1961 if d.has_key(cur.issuer):
1962 possible_issuer = d[cur.issuer]
1963 if cur.isIssuerCert(possible_issuer):
1964 res.append(possible_issuer)
1965 cur = possible_issuer
1970 def remainingDays(self, now=None):
1972 Based on the value of notBefore field, returns the number of
1973 days the certificate will still be valid. The date used for the
1974 comparison is the current and local date, as returned by
1975 time.localtime(), except if 'now' argument is provided another
1976 one. 'now' argument can be given as either a time tuple or a string
1977 representing the date. Accepted format for the string version
1980 - '%b %d %H:%M:%S %Y %Z' e.g. 'Jan 30 07:38:59 2008 GMT'
1981 - '%m/%d/%y' e.g. '01/30/08' (less precise)
1983 If the certificate is no more valid at the date considered, then,
1984 a negative value is returned representing the number of days
1985 since it has expired.
1987 The number of days is returned as a float to deal with the unlikely
1988 case of certificates that are still just valid.
1991 now = time.localtime()
1992 elif type(now) is str:
1995 now = time.strptime(now, '%m/%d/%y')
1997 now = time.strptime(now, '%b %d %H:%M:%S %Y %Z')
1999 warning("Bad time string provided '%s'. Using current time" % now)
2000 now = time.localtime()
2002 now = time.mktime(now)
2003 nft = time.mktime(self.notAfter)
2004 diff = (nft - now)/(24.*3600)
2008 # return SHA-1 hash of cert embedded public key
2009 # !! At the moment, the trailing 0 is in the hashed string if any
2011 m = self.modulus_hexdump
2015 while i<l: # get a string version of modulus
2016 res.append(struct.pack("B", int(m[i:i+2], 16)))
2018 return sha.new("".join(res)).digest()
2020 def output(self, fmt="DER"):
2026 return self.textcert
2028 def export(self, filename, fmt="DER"):
2030 Export certificate in 'fmt' format (PEM, DER or TXT) to file 'filename'
2032 f = open(filename, "wb")
2033 f.write(self.output(fmt))
2036 def isSelfSigned(self):
2038 Return True if the certificate is self signed:
2039 - issuer and subject are the same
2040 - the signature of the certificate is valid.
2042 if self.issuer == self.subject:
2043 return self.isIssuerCert(self)
2046 # Print main informations stored in certificate
2048 print "Serial: %s" % self.serial
2049 print "Issuer: " + self.issuer
2050 print "Subject: " + self.subject
2051 print "Validity: %s to %s" % (self.notBefore_str_simple,
2052 self.notAfter_str_simple)
2055 return "[X.509 Cert. Subject:%s, Issuer:%s]" % (self.subject, self.issuer)
2060 def verifychain(self, anchors, untrusted=None):
2062 Perform verification of certificate chains for that certificate. The
2063 behavior of verifychain method is mapped (and also based) on openssl
2064 verify userland tool (man 1 verify).
2065 A list of anchors is required. untrusted parameter can be provided
2066 a list of untrusted certificates that can be used to reconstruct the
2069 If you have a lot of certificates to verify against the same
2070 list of anchor, consider constructing this list as a cafile
2071 and use .verifychain_from_cafile() instead.
2073 cafile = create_temporary_ca_file(anchors)
2076 untrusted_file = None
2078 untrusted_file = create_temporary_ca_file(untrusted) # hack
2079 if not untrusted_file:
2082 res = self.verifychain_from_cafile(cafile,
2083 untrusted_file=untrusted_file)
2086 os.unlink(untrusted_file)
2089 def verifychain_from_cafile(self, cafile, untrusted_file=None):
2091 Does the same job as .verifychain() but using the list of anchors
2092 from the cafile. This is useful (because more efficient) if
2093 you have a lot of certificates to verify do it that way: it
2094 avoids the creation of a cafile from anchors at each call.
2096 As for .verifychain(), a list of untrusted certificates can be
2097 passed (as a file, this time)
2101 u = "-untrusted %s" % untrusted_file
2103 cmd = "openssl verify -CAfile %s %s " % (cafile, u)
2104 pemcert = self.output(fmt="PEM")
2105 cmdres = self._apply_ossl_cmd(cmd, pemcert)
2108 return cmdres.endswith("\nOK\n") or cmdres.endswith(": OK\n")
2110 def verifychain_from_capath(self, capath, untrusted_file=None):
2112 Does the same job as .verifychain_from_cafile() but using the list
2113 of anchors in capath directory. The directory should contain
2114 certificates files in PEM format with associated links as
2115 created using c_rehash utility (man c_rehash).
2117 As for .verifychain_from_cafile(), a list of untrusted certificates
2118 can be passed as a file (concatenation of the certificates in
2123 u = "-untrusted %s" % untrusted_file
2125 cmd = "openssl verify -CApath %s %s " % (capath, u)
2126 pemcert = self.output(fmt="PEM")
2127 cmdres = self._apply_ossl_cmd(cmd, pemcert)
2130 return cmdres.endswith("\nOK\n") or cmdres.endswith(": OK\n")
2132 def is_revoked(self, crl_list):
2134 Given a list of trusted CRL (their signature has already been
2135 verified with trusted anchors), this function returns True if
2136 the certificate is marked as revoked by one of those CRL.
2138 Note that if the Certificate was on hold in a previous CRL and
2139 is now valid again in a new CRL and bot are in the list, it
2140 will be considered revoked: this is because _all_ CRLs are
2141 checked (not only the freshest) and revocation status is not
2144 Also note that the check on the issuer is performed on the
2145 Authority Key Identifier if available in _both_ the CRL and the
2146 Cert. Otherwise, the issuers are simply compared.
2149 if (self.authorityKeyID is not None and
2150 c.authorityKeyID is not None and
2151 self.authorityKeyID == c.authorityKeyID):
2152 return self.serial in map(lambda x: x[0], c.revoked_cert_serials)
2153 elif (self.issuer == c.issuer):
2154 return self.serial in map(lambda x: x[0], c.revoked_cert_serials)
2164 if not c.isSelfSigned():
2165 s = "_ ... [Missing Root]\n"
2167 s += "%s [Self Signed]\n" % c.subject
2171 s += "%s\_ %s" % (" "*i, c.subject)
2179 # a=popen2.Popen3("openssl crl -text -inform DER -noout ", capturestderr=True)
2180 # a.tochild.write(open("samples/klasa1.crl").read())
2184 class CRL(OSSLHelper):
2185 # Below are the fields we recognize in the -text output of openssl
2186 # and from which we extract information. We expect them in that
2187 # order. Number of spaces does matter.
2188 possible_fields = [ " Version",
2189 " Signature Algorithm:",
2194 " X509v3 Issuer Alternative Name:",
2195 " X509v3 Authority Key Identifier:",
2199 " X509v3 CRL Number:",
2200 "Revoked Certificates:",
2201 "No Revoked Certificates.",
2202 " Signature Algorithm:" ]
2203 possible_fields_count = len(possible_fields)
2205 def __init__(self, crlpath):
2206 error_msg = "Unable to import CRL."
2209 for k in self.possible_fields:
2210 fields_dict[k] = None
2215 if (not '\x00' in crlpath) and os.path.isfile(crlpath):
2216 self.crlpath = crlpath
2217 cert_size = os.path.getsize(crlpath)
2218 if cert_size > MAX_CRL_SIZE:
2219 raise Exception(error_msg)
2225 raise Exception(error_msg)
2230 raise Exception(error_msg)
2232 self.rawcrl = rawcrl
2234 # Let's try to get file format : PEM or DER.
2235 fmtstr = 'openssl crl -text -inform %s -noout '
2236 convertstr = 'openssl crl -inform %s -outform %s '
2237 crl_header = "-----BEGIN X509 CRL-----"
2238 crl_footer = "-----END X509 CRL-----"
2239 l = rawcrl.split(crl_header, 1)
2240 if len(l) == 2: # looks like PEM
2242 l = tmp.split(crl_footer, 1)
2245 rawcrl = "%s%s%s\n" % (crl_header, tmp, crl_footer)
2247 raise Exception(error_msg)
2248 r,w,e = popen2.popen3(fmtstr % "PEM")
2257 self.pemcrl = rawcrl
2258 self.textcrl = textcrl
2259 cmd = convertstr % ("PEM", "DER")
2260 self.dercrl = self._apply_ossl_cmd(cmd, rawcrl)
2262 raise Exception(error_msg)
2263 else: # not PEM, try DER
2264 r,w,e = popen2.popen3(fmtstr % "DER")
2272 self.dercrl = rawcrl
2273 self.textcrl = textcrl
2274 cmd = convertstr % ("DER", "PEM")
2275 self.pemcrl = self._apply_ossl_cmd(cmd, rawcrl)
2276 cmd = convertstr % ("DER", "DER")
2277 self.dercrl = self._apply_ossl_cmd(cmd, rawcrl)
2279 raise Exception(error_msg)
2281 self.osslcmdbase = 'openssl crl -inform %s ' % self.format
2283 r,w,e = popen2.popen3('openssl asn1parse -inform DER ')
2284 w.write(self.dercrl)
2286 self.asn1parsecrl = r.read()
2291 raise Exception(error_msg)
2293 # Grab _raw_ X509v3 Authority Key Identifier, if any.
2294 tmp = self.asn1parsecrl.split(":X509v3 Authority Key Identifier", 1)
2295 self.authorityKeyID = None
2298 tmp = tmp.split("[HEX DUMP]:", 1)[1]
2299 self.authorityKeyID=tmp.split('\n',1)[0]
2301 # Parse the -text output of openssl to make things available
2302 tmp = self.textcrl.split('\n', 1)[1]
2303 l = tmp.split('\n', 1)
2305 raise Exception(error_msg)
2308 k = self.possible_fields[i] # Version
2309 cur = cur[len(k):] + '\n'
2311 l = tmp.split('\n', 1)
2312 if len(l) != 2: # Over
2313 fields_dict[k] = cur
2318 # skip fields we have already seen, this is the purpose of 'i'
2319 for j in range(i, self.possible_fields_count):
2320 f = self.possible_fields[j]
2322 fields_dict[k] = cur
2323 cur = l[len(f):] + '\n'
2333 v = fields_dict[" Version"]
2336 self.version = int(v[1:2])
2337 if self.version is None:
2338 raise Exception(error_msg)
2340 # signature algorithm
2341 v = fields_dict[" Signature Algorithm:"]
2344 v = v.split('\n',1)[0]
2347 if self.sigAlg is None:
2348 raise Exception(error_msg)
2351 v = fields_dict[" Issuer:"]
2354 v = v.split('\n',1)[0]
2357 if self.issuer is None:
2358 raise Exception(error_msg)
2361 v = fields_dict[" Last Update:"]
2362 self.lastUpdate_str = None
2364 v = v.split('\n',1)[0]
2366 self.lastUpdate_str = v
2367 if self.lastUpdate_str is None:
2368 raise Exception(error_msg)
2369 self.lastUpdate = time.strptime(self.lastUpdate_str,
2370 "%b %d %H:%M:%S %Y %Z")
2371 self.lastUpdate_str_simple = time.strftime("%x", self.lastUpdate)
2374 v = fields_dict[" Next Update:"]
2375 self.nextUpdate_str = None
2377 v = v.split('\n',1)[0]
2379 self.nextUpdate_str = v
2380 if self.nextUpdate_str is None:
2381 raise Exception(error_msg)
2382 self.nextUpdate = time.strptime(self.nextUpdate_str,
2383 "%b %d %H:%M:%S %Y %Z")
2384 self.nextUpdate_str_simple = time.strftime("%x", self.nextUpdate)
2386 # XXX Do something for Issuer Alternative Name
2388 # Authority Key Identifier: keyid, dirname and serial
2389 self.authorityKeyID_keyid = None
2390 self.authorityKeyID_dirname = None
2391 self.authorityKeyID_serial = None
2392 if self.authorityKeyID: # (hex version already done using asn1parse)
2393 v = fields_dict[" keyid:"]
2395 v = v.split('\n',1)[0]
2396 v = v.strip().replace(':', '')
2397 self.authorityKeyID_keyid = v
2398 v = fields_dict[" DirName:"]
2400 v = v.split('\n',1)[0]
2401 self.authorityKeyID_dirname = v
2402 v = fields_dict[" serial:"]
2404 v = v.split('\n',1)[0]
2405 v = v.strip().replace(':', '')
2406 self.authorityKeyID_serial = v
2409 v = fields_dict[" X509v3 CRL Number:"]
2412 v = v.split('\n',2)[1]
2414 self.number = int(v)
2416 # Get the list of serial numbers of revoked certificates
2417 self.revoked_cert_serials = []
2418 v = fields_dict["Revoked Certificates:"]
2419 t = fields_dict["No Revoked Certificates."]
2420 if (t is None and v is not None):
2421 v = v.split("Serial Number: ")[1:]
2423 s,d = r.split('\n', 1)
2424 s = s.split('\n', 1)[0]
2425 d = d.split("Revocation Date:", 1)[1]
2426 d = time.strptime(d.strip(), "%b %d %H:%M:%S %Y %Z")
2427 self.revoked_cert_serials.append((s,d))
2430 v = fields_dict[" Signature Algorithm:" ]
2433 v = v.split('\n',1)[1]
2434 v = v.replace(' ', '').replace('\n', '')
2435 self.sig = "".join(map(lambda x: chr(int(x, 16)), v.split(':')))
2436 self.sigLen = len(self.sig)
2437 if self.sig is None:
2438 raise Exception(error_msg)
2443 # Print main informations stored in CRL
2445 print "Version: %d" % self.version
2446 print "sigAlg: " + self.sigAlg
2447 print "Issuer: " + self.issuer
2448 print "lastUpdate: %s" % self.lastUpdate_str_simple
2449 print "nextUpdate: %s" % self.nextUpdate_str_simple
2451 def verify(self, anchors):
2453 Return True if the CRL is signed by one of the provided
2454 anchors. False on error (invalid signature, missing anchorand, ...)
2456 cafile = create_temporary_ca_file(anchors)
2460 cmd = self.osslcmdbase + '-noout -CAfile %s 2>&1' % cafile
2461 cmdres = self._apply_ossl_cmd(cmd, self.rawcrl)
2466 return "verify OK" in cmdres