1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
import io
from enum import Enum
from _sequoia import ffi, lib
from .error import Error
from .glue import SQObject, invoke
class NetworkPolicy(Enum):
Offline = lib.SQ_NETWORK_POLICY_OFFLINE
Anonymized = lib.SQ_NETWORK_POLICY_ANONYMIZED
Encrypted = lib.SQ_NETWORK_POLICY_ENCRYPTED
Insecure = lib.SQ_NETWORK_POLICY_INSECURE
class IPCPolicy(Enum):
External = lib.SQ_IPC_POLICY_EXTERNAL
Internal = lib.SQ_IPC_POLICY_INTERNAL
Robust = lib.SQ_IPC_POLICY_ROBUST
class Context(SQObject):
_del = lib.sq_context_free
def __init__(self,
home=None,
network_policy=NetworkPolicy.Encrypted,
ipc_policy=IPCPolicy.Robust,
ephemeral=False):
cfg = lib.sq_context_configure()
if home:
lib.sq_config_home(cfg, home.encode())
lib.sq_config_network_policy(cfg, network_policy.value)
lib.sq_config_ipc_policy(cfg, ipc_policy.value)
if ephemeral:
lib.sq_config_ephemeral(cfg)
err = ffi.new("pgp_error_t[1]")
o = lib.sq_config_build(cfg, err)
if o == ffi.NULL:
raise Error._from(err[0])
super(Context, self).__init__(o)
class AbstractReader(SQObject, io.RawIOBase):
_del = lib.pgp_reader_free
def readable(self):
return True
def writable(self):
return False
def readinto(self, buf):
return invoke(
lib.pgp_reader_read,
self.ref(),
ffi.cast("uint8_t *", ffi.from_buffer(buf)),
len(buf))
def close(self):
self._delete()
# Implement the context manager protocol.
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
return False
class Reader(AbstractReader):
@classmethod
def open(cls, ctx, filename):
return Reader(
invoke(lib.pgp_reader_from_file,
filename.encode()),
context=ctx)
@classmethod
def from_fd(cls, ctx, fd):
return Reader(lib.pgp_reader_from_fd(fd),
context=ctx)
@classmethod
def from_bytes(cls, ctx, buf):
return Reader(
lib.pgp_reader_from_bytes(
ffi.cast("uint8_t *", ffi.from_buffer(buf)), len(buf)),
context=ctx)
class AbstractWriter(SQObject, io.RawIOBase):
_del = lib.pgp_writer_free
def readable(self):
return False
def writable(self):
return True
def write(self, buf):
return invoke(
lib.pgp_writer_write,
self.ref(),
ffi.cast("const uint8_t *", ffi.from_buffer(buf)),
len(buf))
def close(self):
self._delete()
# Implement the context manager protocol.
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
return False
class Writer(AbstractWriter):
@classmethod
def open(cls, ctx, filename):
return Writer(
invoke(lib.pgp_writer_from_file,
filename.encode()),
context=ctx)
@classmethod
def from_fd(cls, ctx, fd):
return Writer(lib.pgp_writer_from_fd(fd),
context=ctx)
@classmethod
def from_bytes(cls, ctx, buf):
return Writer(
lib.pgp_writer_from_bytes(
ffi.cast("uint8_t *", ffi.from_buffer(buf)), len(buf)),
context=ctx)
|