1 """
2 This modules implements the estimate, serial and parrallel templates for all analysis.
3 """
4
5
6 import getpass
7 import os
8 import subprocess
9 import sys
10 from time import asctime, sleep
11 from timeit import default_timer
12
13
14 from Scientific.DistributedComputing.MasterSlave import initializeMasterProcess, TaskRaisedException
15
16
17 from nMOLDYN.Core.Error import Error
18 from nMOLDYN.Core.Logger import LogMessage
19 from nMOLDYN.Analysis import Analysis
20 from nMOLDYN.Analysis.Dynamics import *
21 from nMOLDYN.Analysis.NMR import *
22 from nMOLDYN.Analysis.Scattering import *
23 from nMOLDYN.Analysis.Structure import *
24
25
26 nmoldyn_package_path = os.path.dirname(os.path.split(__file__)[0])
27
29 """Starts the slaves.
30
31 @param pyroServer: the type of pyro server. One of 'multiprocessor' or 'cluster'.
32 @type pyroServer: string.
33
34 @param pyroNodes: a dictionnary whose keys are the name of the nodes and the value
35 the number of cpus to allocate to this node.
36 @type pyroNodes: dict.
37 """
38
39 taskmanager = os.path.join(os.path.dirname(sys.executable), 'task_manager')
40
41
42 if pyroServer == 'multiprocessor':
43 for k, v in pyroNodes.items():
44 [subprocess.Popen([taskmanager, 'slave', taskName]) for p in range(v)]
45
46 elif pyroServer == 'cluster':
47 raise NotImplementedError
48
49
50
51
53 """Template class for an analysis atom-by-atom ran in serial mode.
54 """
55
57 """Performs the analysis in serial mode.
58 """
59
60
61 if self.estimate:
62 self.offset = default_timer()
63
64 first = True
65 for atom in self.subset:
66 junk = self.trajectory.readParticleTrajectory(atom, first = self.first, last = self.last, skip = self.skip).array
67 if first:
68 timeToReadFirstAtom = default_timer() - self.offset
69 first = False
70
71 self.offset = default_timer() - self.offset - self.nAtoms*timeToReadFirstAtom
72
73 for atom in self.subset:
74 self.chrono = default_timer()
75 x = self.calc(atom, self.trajectoryFilename)
76 self.combine(atom,x)
77 self.updateJobProgress(1)
78 break
79
80
81
82
83
84
85
86 self.chrono = int(self.offset + self.nAtoms*(default_timer() - self.chrono))
87
88 else:
89
90 self.chrono = default_timer()
91
92 for atom in self.subset:
93 x = self.calc(atom, self.trajectoryFilename)
94 self.combine(atom,x)
95 self.updateJobProgress(self.nAtoms)
96
97 self.finalize()
98
99
100 self.chrono = int(default_timer() - self.chrono)
101
103 """Template class for an analysis atom-by-atom ran in parallel mode.
104 """
105
107 """Performs the analysis in parallel mode.
108 """
109
110 LogMessage('info', 'Setting Pyro name server. Please wait ...', ['console'])
111
112 s = subprocess.Popen([sys.executable, '-m', 'Pyro.naming'], stdout = subprocess.PIPE)
113
114
115 sleep(3)
116
117
118 self.taskName = '%s_%s_%s' % (self.shortName, getpass.getuser(), '_'.join(asctime().split()))
119
120
121 script = os.path.abspath(os.path.join(os.path.dirname(__file__), 'Slave.py'))
122 tasks = initializeMasterProcess(self.taskName, slave_script = script, use_name_server = True)
123
124 for atom in self.subset:
125 task_id = tasks.requestTask("analysisPerElement", self, atom, self.trajectoryFilename)
126
127 startSlaves(self.taskName, self.pyroServer, self.pyroNodes)
128
129 LogMessage('info', 'Pyro name server ready.', ['console'])
130
131
132 self.chrono = default_timer()
133
134 for atom in self.subset:
135
136 try:
137 task_id, tag, x = tasks.retrieveResult("analysisPerElement")
138 self.combine(atom,x)
139 self.updateJobProgress(self.nAtoms)
140
141 except:
142 raise Error('Error when retrieving the results over the pyroserver.')
143
144 self.finalize()
145
146
147 self.chrono = int(default_timer() - self.chrono)
148
149
150
151
153 """Template class for an analysis atom-by-atom ran in serial mode.
154 """
155
157 """Performs the analysis in serial mode.
158 """
159
160 if self.estimate:
161 self.offset = default_timer()
162
163 first = True
164 for frameIndex in range(self.nFrames):
165 frame = self.frameIndexes[frameIndex]
166 self.universe.setFromTrajectory(self.trajectory, frame)
167 if first:
168 timeToReadFirstFrame = default_timer() - self.offset
169 first = False
170
171 self.offset = default_timer() - self.offset - self.nFrames*timeToReadFirstFrame
172
173
174 for frameIndex in range(self.nFrames):
175 self.chrono = default_timer()
176 x = self.calc(frameIndex, self.trajectoryFilename)
177 self.combine(frameIndex,x)
178 self.updateJobProgress(1)
179 break
180
181
182
183
184
185
186
187 self.chrono = int(self.offset + self.nFrames*(default_timer() - self.chrono))
188
189 else:
190
191 self.chrono = default_timer()
192
193 for frameIndex in range(self.nFrames):
194 x = self.calc(frameIndex, self.trajectoryFilename)
195 self.combine(frameIndex,x)
196 self.updateJobProgress(self.nFrames)
197
198 self.finalize()
199
200
201 self.chrono = int(default_timer() - self.chrono)
202
204 """Template class for an analysis frame-by-frame ran in parallel mode.
205 """
206
208 """Performs the analysis in parallel mode.
209 """
210
211 LogMessage('info', 'Setting Pyro name server. Please wait ...', ['console'])
212
213 s = subprocess.Popen([sys.executable, '-m', 'Pyro.naming'], stdout = subprocess.PIPE)
214
215
216 sleep(3)
217
218
219 self.taskName = '%s_%s_%s' % (self.shortName, getpass.getuser(), '_'.join(asctime().split()))
220
221
222 script = os.path.abspath(os.path.join(os.path.dirname(__file__), 'Slave.py'))
223 tasks = initializeMasterProcess(self.taskName, slave_script = script, use_name_server = True)
224
225 for frameIndex in range(self.nFrames):
226 task_id = tasks.requestTask("analysisPerElement", self, frameIndex, self.trajectoryFilename)
227
228 startSlaves(self.taskName, self.pyroServer, self.pyroNodes)
229
230 LogMessage('info', 'Pyro name server ready.', ['console'])
231
232
233 self.chrono = default_timer()
234
235 for frameIndex in range(self.nFrames):
236 try:
237 task_id, tag, x = tasks.retrieveResult("analysisPerElement")
238 self.combine(frameIndex,x)
239 self.updateJobProgress(self.nFrames)
240
241 except:
242 raise Error('Error when retrieving the results over the pyroserver.')
243
244 self.finalize()
245
246
247 self.chrono = int(default_timer() - self.chrono)
248
249
250
251
253 """Template class for an analysis group-by-group ran in serial mode.
254 """
255
257 """Performs the analysis in serial mode.
258 """
259
260 if self.estimate:
261 self.offset = default_timer()
262
263 first = True
264 for groupIndex in range(self.nGroups):
265 g = self.group[groupIndex]
266 junk = self.trajectory.readRigidBodyTrajectory(g,\
267 first = self.first,\
268 last = self.last,\
269 skip = self.skip,\
270 reference = self.refConfig)
271 if first:
272 timeToReadFirstGroup = default_timer() - self.offset
273 first = False
274
275 self.offset = default_timer() - self.offset - self.nGroups*timeToReadFirstGroup
276
277
278 for groupIndex in range(self.nGroups):
279 self.chrono = default_timer()
280 x = self.calc(groupIndex, self.trajectoryFilename)
281 self.combine(groupIndex, x)
282 self.updateJobProgress(1)
283 break
284
285
286
287
288
289
290
291 self.chrono = int(self.offset + self.nGroups*(default_timer() - self.chrono))
292
293 else:
294
295 self.chrono = default_timer()
296
297 for groupIndex in range(self.nGroups):
298 x = self.calc(groupIndex, self.trajectoryFilename)
299 self.combine(groupIndex, x)
300 self.updateJobProgress(self.nGroups)
301
302 self.finalize()
303
304
305 self.chrono = int(default_timer() - self.chrono)
306
308 """Template class for an analysis group-by-group ran in parallel mode.
309 """
310
312 """Performs the analysis in parallel mode.
313 """
314
315 LogMessage('info', 'Setting Pyro name server. Please wait ...', ['console'])
316
317 s = subprocess.Popen([sys.executable, '-m', 'Pyro.naming'], stdout = subprocess.PIPE)
318
319
320 sleep(3)
321
322
323 self.taskName = '%s_%s_%s' % (self.shortName, getpass.getuser(), '_'.join(asctime().split()))
324
325
326 script = os.path.abspath(os.path.join(os.path.dirname(__file__), 'Slave.py'))
327 tasks = initializeMasterProcess(self.taskName, slave_script = script, use_name_server = True)
328
329 for groupIndex in range(self.nGroups):
330 task_id = tasks.requestTask("analysisPerElement", self, groupIndex, self.trajectoryFilename)
331
332 startSlaves(self.taskName, self.pyroServer, self.pyroNodes)
333
334 LogMessage('info', 'Pyro name server ready.', ['console'])
335
336
337 self.chrono = default_timer()
338
339 for groupIndex in range(self.nGroups):
340 try:
341 task_id, tag, x = tasks.retrieveResult("analysisPerElement")
342 self.combine(groupIndex, x)
343 self.updateJobProgress(self.nGroups)
344
345 except:
346 raise Error('Error when retrieving the results over the pyroserver.')
347
348 self.finalize()
349
350
351 self.chrono = int(default_timer() - self.chrono)
352
354 """Template class for an analysis qshell-by-qshell ran in serial mode.
355 """
356
358 """Performs the analysis in serial mode.
359 """
360
361 self.chrono = default_timer()
362
363 for qIndex in range(self.nQValues):
364 x = self.calc(qIndex, self.trajectoryFilename)
365 self.combine(qIndex, x)
366 self.updateJobProgress(self.nQValues)
367
368 self.finalize()
369
370
371 self.chrono = default_timer()
372
374 """Template class for an analysis qshell-by-qshell ran in parallel mode.
375 """
376
378 """Performs the analysis in parallel mode.
379 """
380
381 LogMessage('info', 'Setting Pyro name server. Please wait ...', ['console'])
382
383 s = subprocess.Popen([sys.executable, '-m', 'Pyro.naming'], stdout = subprocess.PIPE)
384
385 sleep(3)
386
387
388 self.taskName = '%s_%s_%s' % (self.shortName, getpass.getuser(), '_'.join(asctime().split()))
389
390
391 script = os.path.abspath(os.path.join(os.path.dirname(__file__), 'Slave.py'))
392 tasks = initializeMasterProcess(self.taskName, slave_script = script, use_name_server = True)
393
394 for qIndex in range(self.nQValues):
395 task_id = tasks.requestTask("analysisPerElement", self, qIndex, self.trajectoryFilename)
396
397 startSlaves(self.taskName, self.pyroServer, self.pyroNodes)
398
399 LogMessage('info', 'Pyro name server ready.', ['console'])
400
401
402 self.chrono = default_timer()
403
404 for qIndex in range(self.nQValues):
405 try:
406 task_id, tag, x = tasks.retrieveResult("analysisPerElement")
407 self.combine(qIndex, x)
408 self.updateJobProgress(self.nQValues)
409
410 except:
411 raise Error('Error when retrieving the results over the pyroserver.')
412
413 self.finalize()
414
415
416 self.chrono = int(default_timer() - self.chrono)
417
418
419
420
421
422
423
426
429
430
431
432
435
438
439
440
441
444
447
448
449
450
453
456
457
458
459
462
465
466
467
468
471
474
475
476
477
480
483
484
485
486
489
492
493
494
495
498
501
502
503
504
507
510
511
512
513
516
519
520
521
522
525
528
529
530
531
534
537
538
539
540
543
546
547
548
549
552
555
556
557
558
561
564
565
566
567
570
573
574
575
576
579
582
583
584
585
588
591
592
593
594
597
600
601
602
603
604 -class RigidBodyTrajectory_serial(Analysis, RigidBodyTrajectory, SerialPerGroup):
606
607 -class RigidBodyTrajectory_parallel(Analysis, RigidBodyTrajectory, ParallelPerGroup):
609
610
611
612
615
618
619
620
621
624
627
628
629
630
633
636
637
638
639
642
645
646
647
648
651
652
653
654
657
660
661
662
663
666
669
670
671
672
675
678